skywalker-1/tools/arc_survey.py
Ryan Malloy a9dcf84c38 Add Phase 1 experimenter tools: MCP server, H21cm, beacon logger, arc survey
Four new tools transforming the SkyWalker-1 from satellite TV receiver into
a general-purpose RF observatory:

- skywalker-mcp: FastMCP server exposing 20 tools, 4 resources, 2 prompts.
  Thread-safe DeviceBridge with motor safety (continuous drive opt-in),
  input validation on all frequency/symbol rate/step parameters,
  try/finally on TS capture, path traversal sanitization, and reduced
  lock scope so emergency motor halt isn't blocked during long surveys.

- h21cm.py: Hydrogen 21 cm drift-scan radiometer at 1420.405 MHz with
  Doppler velocity calculation, control band comparison, and CSV output.

- beacon_logger.py: Long-term Ku-band beacon SNR/AGC logger with auto-relock,
  dual CSV/JSONL output, signal handlers, and systemd unit generation.

- arc_survey.py: Multi-satellite orbital arc census with USALS motor control,
  per-slot catalog persistence, resume support, and defensive motor halt
  on all error/interrupt paths.

Documentation: experimenter's roadmap guide + 4 tool reference pages (48 pages total).
2026-02-17 14:45:02 -07:00

452 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Multi-satellite arc survey for the Genpix SkyWalker-1.
Automated "satellite census": points the dish motor to each known GEO
longitude, runs a full-band carrier survey at each position, and aggregates
results into a comprehensive sky map. The diff capability tracks changes
between survey runs.
Usage:
python arc_survey.py --observer-lon -96.8 --slots "97W,99W,101W,103W"
python arc_survey.py --observer-lon -96.8 --file slots.json
python arc_survey.py --observer-lon -96.8 --arc -120 -60 --step 3
python arc_survey.py --resume arc-survey-2026-02-17.json
The tool saves progress after each orbital slot, so interrupted surveys
can be resumed. Each slot's catalog is saved individually, and a summary
report covers the entire arc.
"""
import sys
import os
import argparse
import time
import json
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import SkyWalker1, usals_angle
from survey_engine import SurveyEngine
from carrier_catalog import CarrierCatalog, CATALOG_DIR
# Common North American GEO orbital slots
NA_ORBITAL_SLOTS = {
"129W": -129.0, "125W": -125.0, "123W": -123.0, "121W": -121.0,
"119W": -119.0, "118.7W": -118.7, "116.8W": -116.8, "114.9W": -114.9,
"113W": -113.0, "111.1W": -111.1, "110W": -110.0, "107.3W": -107.3,
"105W": -105.0, "103W": -103.0, "101W": -101.0, "99W": -99.0,
"97W": -97.0, "95W": -95.0, "93W": -93.0, "91W": -91.0,
"89W": -89.0, "87W": -87.0, "85W": -85.0, "83W": -83.0,
"82W": -82.0, "79W": -79.0, "77W": -77.0, "75W": -75.0,
"72.7W": -72.7, "70W": -70.0, "67W": -67.0, "65W": -65.0,
"63W": -63.0, "61.5W": -61.5, "58W": -58.0, "55.5W": -55.5,
}
ARC_SURVEY_DIR = CATALOG_DIR.parent / "arc-surveys"
class ArcSurvey:
"""Multi-position orbital arc survey with persistence and resume."""
def __init__(self, sw: SkyWalker1, observer_lon: float,
observer_lat: float = 0.0, settle_time: float = 15.0):
self.sw = sw
self.observer_lon = observer_lon
self.observer_lat = observer_lat
self.settle_time = settle_time
def survey_slot(self, name: str, sat_lon: float,
coarse_step: float = 5.0,
band: str = "", pol: str = "",
callback=None) -> CarrierCatalog:
"""Survey a single orbital slot: move dish, wait, run survey."""
# Calculate motor angle
angle = usals_angle(self.observer_lon, sat_lon, self.observer_lat)
direction = "west" if angle < 0 else "east"
if callback:
callback("moving", 0,
f"Moving to {name} ({sat_lon:.1f}), "
f"angle {abs(angle):.1f} deg {direction}")
# Command the motor
self.sw.motor_goto_x(self.observer_lon, sat_lon)
# Wait for motor to settle (larger angles need more time)
settle = max(self.settle_time, abs(angle) * 0.3)
if callback:
callback("settling", 20, f"Settling {settle:.0f}s...")
time.sleep(settle)
# Verify we have signal (check AGC for any RF energy)
sig = self.sw.signal_monitor()
if callback:
callback("signal_check", 30,
f"AGC1={sig['agc1']}, power={sig['power_db']:.1f} dB")
# Run the six-stage survey
def survey_cb(stage, pct, msg):
overall_pct = 30 + int(pct * 0.7)
if callback:
callback(stage, overall_pct, msg)
engine = SurveyEngine(self.sw, callback=survey_cb)
catalog = engine.run_full_scan(
coarse_step=coarse_step,
ts_capture_secs=2.0,
)
catalog.name = f"{name} ({sat_lon:.1f})"
catalog.band = band
catalog.pol = pol
catalog.notes = (f"Arc survey position: {name}, "
f"observer: {self.observer_lon:.2f} lon, "
f"motor angle: {angle:.2f} deg")
if callback:
callback("complete", 100,
f"{name}: {len(catalog.carriers)} carriers, "
f"{sum(1 for c in catalog.carriers if c.locked)} locked")
return catalog
def run_arc(self, slots: list[tuple[str, float]],
coarse_step: float = 5.0,
band: str = "", pol: str = "",
save_individual: bool = True,
resume_state: dict | None = None) -> dict:
"""Survey an entire arc of orbital slots.
slots: list of (name, sat_lon) tuples
resume_state: previous arc survey state dict for resuming
Returns a complete arc survey result dict.
"""
ARC_SURVEY_DIR.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y-%m-%d")
# Initialize or resume state
if resume_state:
state = resume_state
completed_names = set(state.get("completed_slots", {}).keys())
else:
state = {
"started": datetime.now(timezone.utc).isoformat(),
"observer_lon": self.observer_lon,
"observer_lat": self.observer_lat,
"total_slots": len(slots),
"completed_slots": {},
"skipped_slots": {},
"summary": {
"total_carriers": 0,
"total_locked": 0,
"total_services": 0,
},
}
completed_names = set()
state_path = ARC_SURVEY_DIR / f"arc-survey-{date_str}.json"
for i, (name, sat_lon) in enumerate(slots):
if name in completed_names:
print(f" [{i+1}/{len(slots)}] Skipping {name} (already surveyed)")
continue
print(f"\n [{i+1}/{len(slots)}] Surveying {name} ({sat_lon:.1f} lon)")
def progress_cb(stage, pct, msg):
print(f" [{pct:3d}%] {stage}: {msg}")
try:
catalog = self.survey_slot(
name, sat_lon,
coarse_step=coarse_step,
band=band, pol=pol,
callback=progress_cb,
)
# Save individual catalog
if save_individual:
slot_filename = f"arc-{date_str}-{name.replace('.', '_')}.json"
cat_path = catalog.save(slot_filename)
print(f" Saved: {cat_path}")
# Update state
carrier_count = len(catalog.carriers)
locked_count = sum(1 for c in catalog.carriers if c.locked)
service_count = sum(len(c.services) for c in catalog.carriers)
state["completed_slots"][name] = {
"sat_lon": sat_lon,
"completed": datetime.now(timezone.utc).isoformat(),
"carriers": carrier_count,
"locked": locked_count,
"services": service_count,
"catalog_file": slot_filename if save_individual else None,
}
state["summary"]["total_carriers"] += carrier_count
state["summary"]["total_locked"] += locked_count
state["summary"]["total_services"] += service_count
except KeyboardInterrupt:
print(f"\n Survey interrupted at {name}")
try:
self.sw.motor_halt()
except Exception:
pass
state["interrupted_at"] = name
_save_state(state, state_path)
print(f" Motor halted. Progress saved to {state_path}")
print(f" Resume with: python arc_survey.py --resume {state_path}")
return state
except Exception as e:
print(f" Error at {name}: {e}")
try:
self.sw.motor_halt()
except Exception:
pass
state["skipped_slots"][name] = {
"sat_lon": sat_lon,
"error": str(e),
}
# Save state after each slot for resume capability
_save_state(state, state_path)
# Final summary
state["completed"] = datetime.now(timezone.utc).isoformat()
_save_state(state, state_path)
return state
def _save_state(state: dict, path: Path) -> None:
"""Save arc survey state to JSON."""
with open(path, 'w') as f:
json.dump(state, f, indent=2)
def parse_slot_string(slot_str: str) -> list[tuple[str, float]]:
"""Parse a comma-separated slot string like '97W,99W,101W'.
Accepts formats: '97W', '97.5W', '3E', '-97', '-97.5'
"""
slots = []
for part in slot_str.split(','):
part = part.strip()
if not part:
continue
if part in NA_ORBITAL_SLOTS:
slots.append((part, NA_ORBITAL_SLOTS[part]))
elif part.upper().endswith('W'):
lon = -float(part[:-1])
slots.append((part.upper(), lon))
elif part.upper().endswith('E'):
lon = float(part[:-1])
slots.append((part.upper(), lon))
else:
lon = float(part)
name = f"{abs(lon):.1f}{'W' if lon < 0 else 'E'}"
slots.append((name, lon))
return slots
def generate_arc_range(start_lon: float, stop_lon: float,
step: float) -> list[tuple[str, float]]:
"""Generate orbital slots at regular intervals across an arc."""
slots = []
lon = start_lon
while lon <= stop_lon:
name = f"{abs(lon):.1f}{'W' if lon < 0 else 'E'}"
slots.append((name, lon))
lon += step
return slots
def print_summary(state: dict) -> None:
"""Print a human-readable arc survey summary."""
print(f"\n Arc Survey Summary")
print(f" ==================")
print(f" Observer: {state['observer_lon']:.2f} lon")
print(f" Slots surveyed: {len(state['completed_slots'])} / {state['total_slots']}")
print(f" Total carriers: {state['summary']['total_carriers']}")
print(f" Total locked: {state['summary']['total_locked']}")
print(f" Total services: {state['summary']['total_services']}")
if state.get("skipped_slots"):
print(f" Skipped: {len(state['skipped_slots'])}")
print(f"\n Per-slot results:")
for name, info in sorted(state["completed_slots"].items(),
key=lambda x: x[1]["sat_lon"]):
lock_str = f"{info['locked']}/{info['carriers']}"
svc_str = f"{info['services']} svc" if info['services'] else ""
print(f" {name:>8s} ({info['sat_lon']:+7.1f}): "
f"{lock_str:>7s} locked {svc_str}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="arc_survey.py",
description="Multi-satellite arc survey for SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
# Survey specific slots (North American arc)
%(prog)s --observer-lon -96.8 --slots "97W,99W,101W,103W"
# Survey an arc range at 3-degree intervals
%(prog)s --observer-lon -96.8 --arc -120 -60 --step 3
# Load slots from a JSON file
%(prog)s --observer-lon -96.8 --file my-slots.json
# Resume an interrupted survey
%(prog)s --resume ~/.skywalker1/arc-surveys/arc-survey-2026-02-17.json
# List common North American orbital slots
%(prog)s --list-slots
slot file format (JSON):
[
{"name": "97W", "lon": -97.0},
{"name": "99W", "lon": -99.0}
]
notes:
- Motor settle time scales with angle (min 15s, + 0.3s per degree)
- Each slot takes 5-15 minutes depending on carrier density
- Progress is saved after each slot; Ctrl-C to pause safely
- Individual catalogs saved to ~/.skywalker1/surveys/
- Arc survey state saved to ~/.skywalker1/arc-surveys/
""",
)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--observer-lon', type=float,
help="Observer longitude (negative=west, e.g. -96.8)")
parser.add_argument('--observer-lat', type=float, default=0.0,
help="Observer latitude (default: 0.0)")
source = parser.add_mutually_exclusive_group()
source.add_argument('--slots', type=str,
help="Comma-separated slot list (e.g. '97W,99W,101W')")
source.add_argument('--file', type=str,
help="JSON file with slot definitions")
source.add_argument('--arc', nargs=2, type=float, metavar=('START', 'STOP'),
help="Arc range in degrees longitude")
source.add_argument('--resume', type=str,
help="Resume from a saved arc survey state file")
source.add_argument('--list-slots', action='store_true',
help="List common NA orbital slots and exit")
parser.add_argument('--step', type=float, default=3.0,
help="Step size for --arc mode (default: 3.0 degrees)")
parser.add_argument('--coarse-step', type=float, default=5.0,
help="Coarse sweep step in MHz (default: 5.0)")
parser.add_argument('--settle-time', type=float, default=15.0,
help="Minimum motor settle time in seconds (default: 15)")
parser.add_argument('--pol', type=str, default="",
help="Polarization label (H/V, for catalog metadata)")
parser.add_argument('--band', type=str, default="",
help="Band label (low/high, for catalog metadata)")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.list_slots:
print("Common North American GEO orbital slots:")
for name in sorted(NA_ORBITAL_SLOTS, key=lambda n: NA_ORBITAL_SLOTS[n]):
lon = NA_ORBITAL_SLOTS[name]
print(f" {name:>8s} {lon:+7.1f}")
return
# Determine slot list
resume_state = None
if args.resume:
with open(args.resume) as f:
resume_state = json.load(f)
observer_lon = resume_state["observer_lon"]
observer_lat = resume_state.get("observer_lat", 0.0)
# Reconstruct slots from state
all_slot_names = (
list(resume_state.get("completed_slots", {}).keys()) +
list(resume_state.get("skipped_slots", {}).keys())
)
# We need the original slot list — reconstruct from completed + remaining
slots = []
for name, info in resume_state.get("completed_slots", {}).items():
slots.append((name, info["sat_lon"]))
for name, info in resume_state.get("skipped_slots", {}).items():
slots.append((name, info["sat_lon"]))
# Sort by longitude
slots.sort(key=lambda x: x[1])
print(f"Resuming arc survey: {len(resume_state.get('completed_slots', {}))} "
f"of {len(slots)} slots completed")
else:
if not args.observer_lon and args.observer_lon != 0:
parser.error("--observer-lon is required (or use --resume)")
observer_lon = args.observer_lon
observer_lat = args.observer_lat
if args.slots:
slots = parse_slot_string(args.slots)
elif args.file:
with open(args.file) as f:
data = json.load(f)
slots = [(d["name"], d["lon"]) for d in data]
elif args.arc:
start, stop = sorted(args.arc)
slots = generate_arc_range(start, stop, args.step)
else:
parser.error("Specify --slots, --file, --arc, or --resume")
if not slots:
print("No orbital slots to survey", file=sys.stderr)
sys.exit(1)
print(f"Arc Survey")
print(f" Observer: {observer_lon:.2f} lon, {observer_lat:.2f} lat")
print(f" Orbital slots: {len(slots)}")
for name, lon in slots:
angle = usals_angle(observer_lon, lon, observer_lat)
direction = "W" if angle < 0 else "E"
print(f" {name:>8s} {lon:+7.1f} (motor: {abs(angle):.1f} deg {direction})")
print()
with SkyWalker1(verbose=args.verbose) as sw:
sw.ensure_booted()
survey = ArcSurvey(
sw, observer_lon, observer_lat,
settle_time=args.settle_time,
)
state = survey.run_arc(
slots,
coarse_step=args.coarse_step,
band=args.band, pol=args.pol,
resume_state=resume_state,
)
print_summary(state)
if __name__ == "__main__":
main()