#!/usr/bin/env python3 """ Valid SIP Registration Test Tests that legitimate registrations pass through SIP Guardian successfully. """ import socket import time import argparse import random import string import hashlib def generate_call_id(): return ''.join(random.choices(string.ascii_letters + string.digits, k=32)) def generate_branch(): return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) def compute_digest_response(username: str, password: str, realm: str, nonce: str, uri: str, method: str = 'REGISTER') -> str: """Compute SIP Digest authentication response""" ha1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest() ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest() response = hashlib.md5(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest() return response def create_register_with_auth(target_host: str, target_port: int, extension: str, password: str, from_ip: str, realm: str, nonce: str) -> bytes: """Create an authenticated SIP REGISTER request""" call_id = generate_call_id() branch = generate_branch() tag = ''.join(random.choices(string.digits, k=8)) uri = f"sip:{target_host}:{target_port}" response = compute_digest_response(extension, password, realm, nonce, uri) request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r Max-Forwards: 70\r From: ;tag={tag}\r To: \r Call-ID: {call_id}@{from_ip}\r CSeq: 2 REGISTER\r Contact: \r Authorization: Digest username="{extension}",realm="{realm}",nonce="{nonce}",uri="{uri}",response="{response}",algorithm=MD5\r Expires: 3600\r User-Agent: ValidClient/1.0\r Content-Length: 0\r \r """ return request.encode() def create_initial_register(target_host: str, target_port: int, extension: str, from_ip: str) -> bytes: """Create initial REGISTER without auth (to get challenge)""" call_id = generate_call_id() branch = generate_branch() tag = ''.join(random.choices(string.digits, k=8)) request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r Max-Forwards: 70\r From: ;tag={tag}\r To: \r Call-ID: {call_id}@{from_ip}\r CSeq: 1 REGISTER\r Contact: \r Expires: 3600\r User-Agent: ValidClient/1.0\r Content-Length: 0\r \r """ return request.encode() def parse_www_authenticate(response: str) -> tuple: """Parse WWW-Authenticate header to get realm and nonce""" for line in response.split('\r\n'): if line.lower().startswith('www-authenticate:'): # Extract realm realm_start = line.find('realm="') + 7 realm_end = line.find('"', realm_start) realm = line[realm_start:realm_end] if realm_start > 6 else '' # Extract nonce nonce_start = line.find('nonce="') + 7 nonce_end = line.find('"', nonce_start) nonce = line[nonce_start:nonce_end] if nonce_start > 6 else '' return realm, nonce return '', '' def main(): parser = argparse.ArgumentParser(description='Valid SIP Registration Test') parser.add_argument('target', help='Target host (Caddy proxy)') parser.add_argument('-p', '--port', type=int, default=5060, help='Target port') parser.add_argument('-e', '--extension', default='100', help='Extension to register') parser.add_argument('-s', '--secret', default='password123', help='Extension password') parser.add_argument('-r', '--repeat', type=int, default=1, help='Number of registration cycles') args = parser.parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((args.target, args.port)) from_ip = sock.getsockname()[0] sock.close() print(f"[*] Valid Registration Test") print(f"[*] Target: {args.target}:{args.port}") print(f"[*] Extension: {args.extension}") print(f"[*] Source IP: {from_ip}") print() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) target = (args.target, args.port) for cycle in range(args.repeat): print(f"[*] Registration cycle {cycle + 1}/{args.repeat}") # Send initial REGISTER request = create_initial_register(args.target, args.port, args.extension, from_ip) sock.sendto(request, target) try: sock.settimeout(5.0) response, _ = sock.recvfrom(4096) response = response.decode() except socket.timeout: print("[!] TIMEOUT - Connection may be blocked") continue first_line = response.split('\r\n')[0] print(f"[*] Initial response: {first_line}") if '401' in response or '407' in response: # Parse auth challenge realm, nonce = parse_www_authenticate(response) print(f"[*] Got challenge: realm={realm}") # Send authenticated REGISTER auth_request = create_register_with_auth( args.target, args.port, args.extension, args.secret, from_ip, realm, nonce ) sock.sendto(auth_request, target) try: response, _ = sock.recvfrom(4096) response = response.decode() first_line = response.split('\r\n')[0] print(f"[*] Auth response: {first_line}") if '200' in response: print("[+] SUCCESS - Registration completed!") else: print(f"[!] Failed: {first_line}") except socket.timeout: print("[!] TIMEOUT after auth - may be blocked") elif '200' in response: print("[+] SUCCESS - Already registered or no auth required") else: print(f"[?] Unexpected response: {first_line}") if cycle < args.repeat - 1: time.sleep(2) sock.close() if __name__ == '__main__': main()