From 95a794ba69a957a789185c514b85f751bc172d8b Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 7 Dec 2025 15:39:30 -0700 Subject: [PATCH] Fix enumeration config initialization and add test script - Fix SetEnumerationConfig to create detector if not exists Previously, the config would be silently discarded if called before the detector was lazily initialized by GetEnumerationDetector - Add test_enumeration.py script for sandbox testing Includes fire-and-forget mode (--no-wait) for proper scanner simulation --- enumeration.go | 21 ++-- sandbox/scripts/test_enumeration.py | 147 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 6 deletions(-) create mode 100755 sandbox/scripts/test_enumeration.py diff --git a/enumeration.go b/enumeration.go index 749f8df..778c703 100644 --- a/enumeration.go +++ b/enumeration.go @@ -99,17 +99,26 @@ func GetEnumerationDetector(logger *zap.Logger) *EnumerationDetector { } // SetEnumerationConfig updates the global detector configuration +// If the detector doesn't exist yet, it creates one with the given config func SetEnumerationConfig(config EnumerationConfig) { enumDetectorMu.Lock() defer enumDetectorMu.Unlock() - if globalEnumDetector != nil { - globalEnumDetector.config = config - // Rebuild exempt set - globalEnumDetector.exemptSet = make(map[string]bool) - for _, ext := range config.ExemptExtensions { - globalEnumDetector.exemptSet[ext] = true + if globalEnumDetector == nil { + // Create detector with the provided config + globalEnumDetector = &EnumerationDetector{ + config: config, + tracker: make(map[string]*ExtensionAttempts), + logger: zap.NewNop(), // Will be replaced on first use } + } else { + globalEnumDetector.config = config + } + + // Rebuild exempt set + globalEnumDetector.exemptSet = make(map[string]bool) + for _, ext := range config.ExemptExtensions { + globalEnumDetector.exemptSet[ext] = true } } diff --git a/sandbox/scripts/test_enumeration.py b/sandbox/scripts/test_enumeration.py new file mode 100755 index 0000000..374d6a0 --- /dev/null +++ b/sandbox/scripts/test_enumeration.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +SIP Extension Enumeration Test for SIP Guardian + +Simulates SIPVicious svwar-style sequential extension scanning. +Should trigger enumeration detection after threshold is reached. +""" + +import socket +import time +import argparse +import random +import string + +def generate_call_id(): + return ''.join(random.choices(string.ascii_letters + string.digits, k=32)) + +def generate_branch(): + return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + +def create_register_request(target_host: str, target_port: int, extension: str, from_ip: str, cseq: int) -> bytes: + """Create a SIP REGISTER request targeting an extension. + + Uses REGISTER method with incrementing CSeq to avoid triggering + the 'cseq-flood' pattern detection (which looks for 'CSeq: 1 OPTIONS'). + """ + call_id = generate_call_id() + branch = generate_branch() + tag = ''.join(random.choices(string.digits, k=8)) + + request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r +Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r +Max-Forwards: 70\r +From: ;tag={tag}\r +To: \r +Call-ID: {call_id}@{from_ip}\r +CSeq: {cseq} REGISTER\r +Contact: \r +Expires: 3600\r +User-Agent: SIPPhone/1.0\r +Content-Length: 0\r +\r +""" + return request.encode() + +def send_request(sock: socket.socket, target: tuple, request: bytes, timeout: float = 1.0, wait_response: bool = True) -> str: + """Send request and optionally receive response. + + In fire-and-forget mode (wait_response=False), we just send the packet + without waiting for a response. This better simulates scanner behavior. + """ + try: + sock.sendto(request, target) + if not wait_response: + return "SENT" + sock.settimeout(timeout) + response, _ = sock.recvfrom(4096) + return response.decode() + except socket.timeout: + return "TIMEOUT" + except Exception as e: + return f"ERROR: {e}" + +def main(): + parser = argparse.ArgumentParser(description='Test enumeration detection') + parser.add_argument('target', help='Target host (e.g., localhost)') + parser.add_argument('-p', '--port', type=int, default=5060, help='Target port') + parser.add_argument('-s', '--start', type=int, default=300, help='Start extension (default 300 to avoid test-extension patterns)') + parser.add_argument('-e', '--end', type=int, default=315, help='End extension') + parser.add_argument('-d', '--delay', type=float, default=0.1, help='Delay between requests') + parser.add_argument('--sequential', action='store_true', help='Use sequential extensions (triggers pattern detection)') + parser.add_argument('--random', action='store_true', help='Use random extensions') + parser.add_argument('--no-wait', action='store_true', help='Fire-and-forget mode (don\'t wait for responses)') + args = parser.parse_args() + + # Create UDP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(('0.0.0.0', 0)) + local_port = sock.getsockname()[1] + + # Get local IP + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.connect((args.target, 80)) + local_ip = s.getsockname()[0] + except: + local_ip = '127.0.0.1' + finally: + s.close() + + target = (args.target, args.port) + + print(f"šŸŽÆ Enumeration Test") + print(f" Target: {args.target}:{args.port}") + print(f" From: {local_ip}:{local_port}") + print(f" Extensions: {args.start} to {args.end}") + print(f" Mode: {'Sequential' if args.sequential else 'Random' if args.random else 'Linear'}") + print(f" Wait for response: {'No' if args.no_wait else 'Yes'}") + print("-" * 50) + + extensions = list(range(args.start, args.end + 1)) + if args.random: + random.shuffle(extensions) + + blocked = False + for i, ext in enumerate(extensions, 1): + request = create_register_request(args.target, args.port, str(ext), local_ip, cseq=i) + response = send_request(sock, target, request, wait_response=not args.no_wait) + + # Check if we got blocked (timeout or connection refused) + # In no-wait mode, we check later by sending a probe request + if args.no_wait: + status = "āœ“ SENT" + elif "TIMEOUT" in response or "ERROR" in response: + if not blocked: + print(f"\nā›” BLOCKED after {i} requests (extension {ext})") + blocked = True + status = "āŒ BLOCKED" + else: + # Parse response code + try: + status_line = response.split('\r\n')[0] + code = status_line.split()[1] + status = f"āœ“ {code}" + except: + status = "? Unknown" + + print(f" [{i:3d}] Extension {ext}: {status}") + + if blocked: + # Try a few more to confirm we're blocked + if i > len(extensions) - 3: + break + + time.sleep(args.delay) + + sock.close() + + print("-" * 50) + if blocked: + print("āœ… Enumeration detection WORKING - attack was blocked!") + else: + print("āš ļø Attack completed without being blocked") + print(" (threshold may not have been reached)") + +if __name__ == '__main__': + main()