#!/usr/bin/env python3 """ SIP Extension Enumeration Test for SIP Guardian Simulates SIPVicious svwar-style sequential extension scanning. Should trigger enumeration detection after threshold is reached. """ import socket import time import argparse import random import string def generate_call_id(): return ''.join(random.choices(string.ascii_letters + string.digits, k=32)) def generate_branch(): return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) def create_register_request(target_host: str, target_port: int, extension: str, from_ip: str, cseq: int) -> bytes: """Create a SIP REGISTER request targeting an extension. Uses REGISTER method with incrementing CSeq to avoid triggering the 'cseq-flood' pattern detection (which looks for 'CSeq: 1 OPTIONS'). """ call_id = generate_call_id() branch = generate_branch() tag = ''.join(random.choices(string.digits, k=8)) request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r Max-Forwards: 70\r From: ;tag={tag}\r To: \r Call-ID: {call_id}@{from_ip}\r CSeq: {cseq} REGISTER\r Contact: \r Expires: 3600\r User-Agent: SIPPhone/1.0\r Content-Length: 0\r \r """ return request.encode() def send_request(sock: socket.socket, target: tuple, request: bytes, timeout: float = 1.0, wait_response: bool = True) -> str: """Send request and optionally receive response. In fire-and-forget mode (wait_response=False), we just send the packet without waiting for a response. This better simulates scanner behavior. """ try: sock.sendto(request, target) if not wait_response: return "SENT" sock.settimeout(timeout) response, _ = sock.recvfrom(4096) return response.decode() except socket.timeout: return "TIMEOUT" except Exception as e: return f"ERROR: {e}" def main(): parser = argparse.ArgumentParser(description='Test enumeration detection') parser.add_argument('target', help='Target host (e.g., localhost)') parser.add_argument('-p', '--port', type=int, default=5060, help='Target port') parser.add_argument('-s', '--start', type=int, default=300, help='Start extension (default 300 to avoid test-extension patterns)') parser.add_argument('-e', '--end', type=int, default=315, help='End extension') parser.add_argument('-d', '--delay', type=float, default=0.1, help='Delay between requests') parser.add_argument('--sequential', action='store_true', help='Use sequential extensions (triggers pattern detection)') parser.add_argument('--random', action='store_true', help='Use random extensions') parser.add_argument('--no-wait', action='store_true', help='Fire-and-forget mode (don\'t wait for responses)') args = parser.parse_args() # Create UDP socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', 0)) local_port = sock.getsockname()[1] # Get local IP s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect((args.target, 80)) local_ip = s.getsockname()[0] except: local_ip = '127.0.0.1' finally: s.close() target = (args.target, args.port) print(f"šŸŽÆ Enumeration Test") print(f" Target: {args.target}:{args.port}") print(f" From: {local_ip}:{local_port}") print(f" Extensions: {args.start} to {args.end}") print(f" Mode: {'Sequential' if args.sequential else 'Random' if args.random else 'Linear'}") print(f" Wait for response: {'No' if args.no_wait else 'Yes'}") print("-" * 50) extensions = list(range(args.start, args.end + 1)) if args.random: random.shuffle(extensions) blocked = False for i, ext in enumerate(extensions, 1): request = create_register_request(args.target, args.port, str(ext), local_ip, cseq=i) response = send_request(sock, target, request, wait_response=not args.no_wait) # Check if we got blocked (timeout or connection refused) # In no-wait mode, we check later by sending a probe request if args.no_wait: status = "āœ“ SENT" elif "TIMEOUT" in response or "ERROR" in response: if not blocked: print(f"\nā›” BLOCKED after {i} requests (extension {ext})") blocked = True status = "āŒ BLOCKED" else: # Parse response code try: status_line = response.split('\r\n')[0] code = status_line.split()[1] status = f"āœ“ {code}" except: status = "? Unknown" print(f" [{i:3d}] Extension {ext}: {status}") if blocked: # Try a few more to confirm we're blocked if i > len(extensions) - 3: break time.sleep(args.delay) sock.close() print("-" * 50) if blocked: print("āœ… Enumeration detection WORKING - attack was blocked!") else: print("āš ļø Attack completed without being blocked") print(" (threshold may not have been reached)") if __name__ == '__main__': main()