#!/usr/bin/env python3 """ Hydrogen 21 cm drift-scan radiometer for the Genpix SkyWalker-1. Detects neutral hydrogen emission at 1420.405 MHz — directly in the IF range with no LNB required. Connect an L-band antenna (patch, helical, or horn) directly to the F-connector. The Milky Way's spiral arms create a velocity-dispersed emission profile detectable even with the BCM4500's ~346 kHz RBW. Earth's rotation provides a natural drift-scan across the sky. Usage: python h21cm.py # single sweep, print spectrum python h21cm.py --drift --duration 3600 # 1-hour drift scan python h21cm.py --drift --motor-step 5 # step motor between sweeps python h21cm.py --output data.csv # log to CSV The c in 21 cm stands for centimeters. The frequency (1420.405 MHz) comes from the hyperfine transition in neutral hydrogen — when the electron's spin flips relative to the proton. This is the most fundamental spectral line in radio astronomy, and you can detect it with a $30 DVB-S dongle. """ import sys import os import argparse import time import csv import math from datetime import datetime, timezone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import SkyWalker1, agc_to_power_db # Physical constants H1_FREQ_MHZ = 1420.405751 # Hydrogen 21 cm rest frequency C_KM_S = 299792.458 # Speed of light def freq_to_velocity(freq_mhz: float) -> float: """Convert observed frequency to radial velocity via Doppler shift. v = c * (f_rest - f_obs) / f_rest Positive velocity = receding (redshifted, lower frequency). Negative velocity = approaching (blueshifted, higher frequency). """ return C_KM_S * (H1_FREQ_MHZ - freq_mhz) / H1_FREQ_MHZ def sweep_h1_band(sw: SkyWalker1, center_mhz: float = H1_FREQ_MHZ, span_mhz: float = 4.0, step_mhz: float = 0.5, dwell_ms: int = 50, averages: int = 1) -> dict: """Sweep the hydrogen line band and return power measurements. Higher dwell_ms and multiple averages improve SNR for this weak signal. Default 50ms dwell is 5x longer than typical satellite sweeps. Returns dict with frequencies, powers, velocities, and statistics. """ start = center_mhz - span_mhz / 2 stop = center_mhz + span_mhz / 2 # Accumulate multiple sweeps for averaging all_powers = None for avg in range(averages): freqs, powers, raw = sw.sweep_spectrum( start, stop, step_mhz=step_mhz, dwell_ms=dwell_ms, sr_ksps=1000, mod_index=0, fec_index=5, ) if all_powers is None: all_powers = [0.0] * len(powers) for i in range(len(powers)): all_powers[i] += powers[i] # Average avg_powers = [p / averages for p in all_powers] # Calculate velocities velocities = [freq_to_velocity(f) for f in freqs] # Baseline: edges of the band should be "empty" (no hydrogen) edge_count = max(2, len(avg_powers) // 5) baseline = (sum(avg_powers[:edge_count]) + sum(avg_powers[-edge_count:])) / (2 * edge_count) # Excess power above baseline excess = [p - baseline for p in avg_powers] # Find peak excess (the hydrogen line center) peak_idx = max(range(len(excess)), key=lambda i: excess[i]) peak_freq = freqs[peak_idx] peak_excess = excess[peak_idx] peak_velocity = velocities[peak_idx] return { "timestamp": datetime.now(timezone.utc).isoformat(), "freqs_mhz": freqs, "powers_db": avg_powers, "velocities_km_s": velocities, "excess_db": excess, "baseline_db": baseline, "peak_freq_mhz": peak_freq, "peak_excess_db": peak_excess, "peak_velocity_km_s": peak_velocity, "averages": averages, "dwell_ms": dwell_ms, } def sweep_control_band(sw: SkyWalker1, step_mhz: float = 0.5, dwell_ms: int = 50) -> dict: """Sweep a control band (1430-1434 MHz) where no hydrogen is expected. Comparing the control band to the hydrogen band reveals whether a detected power bump is real emission or just system noise variation. """ freqs, powers, _ = sw.sweep_spectrum( 1430.0, 1434.0, step_mhz=step_mhz, dwell_ms=dwell_ms, sr_ksps=1000, mod_index=0, fec_index=5, ) mean_power = sum(powers) / len(powers) if powers else 0 return { "control_freqs_mhz": freqs, "control_powers_db": powers, "control_mean_db": mean_power, } def print_spectrum(result: dict, show_velocity: bool = True) -> None: """Print an ASCII spectrum of the hydrogen band.""" freqs = result["freqs_mhz"] excess = result["excess_db"] velocities = result["velocities_km_s"] baseline = result["baseline_db"] # Scale for display max_excess = max(excess) if excess else 1.0 min_excess = min(excess) span = max(max_excess - min_excess, 0.5) print(f"\n Hydrogen 21 cm Spectrum") print(f" Baseline: {baseline:.2f} dB | Peak excess: {result['peak_excess_db']:.2f} dB") print(f" Peak at {result['peak_freq_mhz']:.3f} MHz ({result['peak_velocity_km_s']:+.1f} km/s)") print() bar_width = 50 for i in range(len(freqs)): f = freqs[i] e = excess[i] v = velocities[i] # Normalize to bar width filled = int((e - min_excess) / span * bar_width) filled = max(0, min(filled, bar_width)) bar = '#' * filled + '-' * (bar_width - filled) # Mark the hydrogen rest frequency marker = " *" if abs(f - H1_FREQ_MHZ) < 0.3 else " " if show_velocity: print(f" {f:8.3f} MHz {v:+7.1f} km/s [{bar}] {e:+.2f} dB{marker}") else: print(f" {f:8.3f} MHz [{bar}] {e:+.2f} dB{marker}") print() print(" * = hydrogen rest frequency (1420.405 MHz)") def drift_scan(sw: SkyWalker1, duration_secs: float, interval_secs: float, step_mhz: float, dwell_ms: int, averages: int, motor_step: int, output_path: str | None) -> None: """Run a drift scan: repeated sweeps over time. Earth's rotation naturally scans the sky. Each sweep captures the hydrogen profile at the current sky position. Over hours, you trace out the galactic plane. """ csv_writer = None csv_file = None header_written = False if output_path: csv_file = open(output_path, 'w', newline='') csv_writer = csv.writer(csv_file) start_time = time.time() scan_num = 0 try: while time.time() - start_time < duration_secs: scan_num += 1 elapsed = time.time() - start_time remaining = duration_secs - elapsed print(f"\n--- Scan #{scan_num} (elapsed {elapsed:.0f}s, " f"remaining {remaining:.0f}s) ---") # Motor step between scans (for declination scanning) if motor_step and scan_num > 1: print(f" Stepping motor {motor_step} steps east...") sw.motor_drive_east(motor_step) time.sleep(1.0) result = sweep_h1_band(sw, step_mhz=step_mhz, dwell_ms=dwell_ms, averages=averages) print_spectrum(result, show_velocity=True) # Write CSV if csv_writer: if not header_written: csv_writer.writerow([ "timestamp", "scan_num", "freq_mhz", "power_db", "excess_db", "velocity_km_s", "baseline_db", ]) header_written = True for i in range(len(result["freqs_mhz"])): csv_writer.writerow([ result["timestamp"], scan_num, f"{result['freqs_mhz'][i]:.3f}", f"{result['powers_db'][i]:.3f}", f"{result['excess_db'][i]:.3f}", f"{result['velocities_km_s'][i]:.1f}", f"{result['baseline_db']:.3f}", ]) csv_file.flush() # Wait for next scan if remaining > interval_secs: print(f" Next scan in {interval_secs:.0f}s...") time.sleep(interval_secs) except KeyboardInterrupt: print("\n Drift scan interrupted") finally: if csv_file: csv_file.close() print(f" Data saved to {output_path}") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="h21cm.py", description="Hydrogen 21 cm line radiometer for SkyWalker-1", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s # single sweep, print spectrum %(prog)s --averages 8 # 8x averaging for better SNR %(prog)s --drift --duration 3600 # 1-hour drift scan %(prog)s --drift --motor-step 5 # step motor between sweeps %(prog)s --output h21cm-data.csv # log to CSV %(prog)s --control # include control band comparison notes: - Connect an L-band antenna directly to the F-connector (no LNB) - LNB power is disabled automatically for direct input - Hydrogen emission is weak; use --averages 4-16 for best results - The --dwell option increases per-step integration time (default 50ms) - Earth rotation provides natural sky drift at ~15 deg/hour """, ) parser.add_argument('-v', '--verbose', action='store_true', help="Show raw USB traffic") parser.add_argument('--center', type=float, default=H1_FREQ_MHZ, help=f"Center frequency in MHz (default: {H1_FREQ_MHZ})") parser.add_argument('--span', type=float, default=4.0, help="Frequency span in MHz (default: 4.0)") parser.add_argument('--step', type=float, default=0.5, help="Frequency step in MHz (default: 0.5)") parser.add_argument('--dwell', type=int, default=50, help="Dwell time per step in ms (default: 50)") parser.add_argument('--averages', type=int, default=1, help="Number of sweeps to average (default: 1)") parser.add_argument('--output', '-o', type=str, default=None, help="CSV output file path") parser.add_argument('--control', action='store_true', help="Include control band (1430-1434 MHz) for comparison") parser.add_argument('--no-velocity', action='store_true', help="Don't show velocity axis in spectrum display") drift_group = parser.add_argument_group('drift scan') drift_group.add_argument('--drift', action='store_true', help="Enable drift scan mode (repeated sweeps)") drift_group.add_argument('--duration', type=float, default=3600, help="Drift scan duration in seconds (default: 3600)") drift_group.add_argument('--interval', type=float, default=60, help="Seconds between sweeps (default: 60)") drift_group.add_argument('--motor-step', type=int, default=0, help="Motor steps between sweeps (0=no motor, default: 0)") return parser def main(): parser = build_parser() args = parser.parse_args() with SkyWalker1(verbose=args.verbose) as sw: sw.ensure_booted() # Disable LNB power for direct input sw.start_intersil(on=False) print("LNB power disabled (direct L-band input mode)") if args.drift: drift_scan(sw, duration_secs=args.duration, interval_secs=args.interval, step_mhz=args.step, dwell_ms=args.dwell, averages=args.averages, motor_step=args.motor_step, output_path=args.output) else: # Single sweep print(f"\nSweeping {args.center - args.span/2:.1f} - " f"{args.center + args.span/2:.1f} MHz " f"(step={args.step} MHz, dwell={args.dwell}ms, " f"avg={args.averages}x)") result = sweep_h1_band(sw, center_mhz=args.center, span_mhz=args.span, step_mhz=args.step, dwell_ms=args.dwell, averages=args.averages) print_spectrum(result, show_velocity=not args.no_velocity) if args.control: print(" Control band (1430-1434 MHz, no hydrogen expected):") ctrl = sweep_control_band(sw, step_mhz=args.step, dwell_ms=args.dwell) print(f" Control mean: {ctrl['control_mean_db']:.2f} dB") print(f" H1 baseline: {result['baseline_db']:.2f} dB") diff = result["peak_excess_db"] print(f" H1 peak excess above baseline: {diff:+.2f} dB") # Write single sweep to CSV if requested if args.output: with open(args.output, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ "freq_mhz", "power_db", "excess_db", "velocity_km_s", "baseline_db", ]) for i in range(len(result["freqs_mhz"])): writer.writerow([ f"{result['freqs_mhz'][i]:.3f}", f"{result['powers_db'][i]:.3f}", f"{result['excess_db'][i]:.3f}", f"{result['velocities_km_s'][i]:.1f}", f"{result['baseline_db']:.3f}", ]) print(f" Data saved to {args.output}") if __name__ == "__main__": main()