#!/usr/bin/env python3 """ SIP Brute Force Simulation for SIP Guardian Testing Simulates authentication failures to test rate limiting and banning. """ import socket import time import argparse import random import string def generate_call_id(): """Generate a random SIP Call-ID""" return ''.join(random.choices(string.ascii_letters + string.digits, k=32)) def generate_branch(): """Generate a random Via branch parameter""" return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) def create_register_request(target_host: str, target_port: int, extension: str, from_ip: str) -> bytes: """Create a SIP REGISTER request""" call_id = generate_call_id() branch = generate_branch() tag = ''.join(random.choices(string.digits, k=8)) request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r Max-Forwards: 70\r From: ;tag={tag}\r To: \r Call-ID: {call_id}@{from_ip}\r CSeq: 1 REGISTER\r Contact: \r Expires: 3600\r User-Agent: BruteForcer/1.0\r Content-Length: 0\r \r """ return request.encode() def send_register(sock: socket.socket, target: tuple, request: bytes) -> str: """Send REGISTER and receive response""" try: sock.sendto(request, target) sock.settimeout(2.0) response, _ = sock.recvfrom(4096) return response.decode() except socket.timeout: return "TIMEOUT" except Exception as e: return f"ERROR: {e}" def main(): parser = argparse.ArgumentParser(description='SIP Brute Force Simulator') parser.add_argument('target', help='Target host (Caddy proxy)') parser.add_argument('-p', '--port', type=int, default=5060, help='Target port') parser.add_argument('-e', '--extensions', default='100-105', help='Extension range (e.g., 100-200)') parser.add_argument('-c', '--count', type=int, default=10, help='Attempts per extension') parser.add_argument('-d', '--delay', type=float, default=0.1, help='Delay between attempts') parser.add_argument('--udp', action='store_true', default=True, help='Use UDP (default)') parser.add_argument('--tcp', action='store_true', help='Use TCP') args = parser.parse_args() # Parse extension range if '-' in args.extensions: start, end = map(int, args.extensions.split('-')) extensions = list(range(start, end + 1)) else: extensions = [int(args.extensions)] # Get our IP sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((args.target, args.port)) from_ip = sock.getsockname()[0] sock.close() print(f"[*] Starting brute force simulation") print(f"[*] Target: {args.target}:{args.port}") print(f"[*] Source IP: {from_ip}") print(f"[*] Extensions: {extensions}") print(f"[*] Attempts per extension: {args.count}") print() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) target = (args.target, args.port) attempts = 0 blocked_at = None for ext in extensions: for i in range(args.count): request = create_register_request(args.target, args.port, str(ext), from_ip) response = send_register(sock, target, request) attempts += 1 if 'TIMEOUT' in response: if blocked_at is None: blocked_at = attempts print(f"[!] BLOCKED after {attempts} attempts (extension {ext}, attempt {i+1})") print(f"[+] SIP Guardian is working! Blocked after {blocked_at} attempts") return elif '401' in response or '407' in response: print(f"[*] Auth required: ext={ext} attempt={i+1} total={attempts}") elif '403' in response: print(f"[!] FORBIDDEN: ext={ext} - Connection blocked") if blocked_at is None: blocked_at = attempts else: # Print first line of response first_line = response.split('\r\n')[0] if response else 'No response' print(f"[?] Response: {first_line} (ext={ext})") time.sleep(args.delay) print(f"\n[*] Completed {attempts} attempts without being blocked") print("[!] SIP Guardian may not be working correctly") sock.close() if __name__ == '__main__': main()