Fix enumeration config initialization and add test script

- Fix SetEnumerationConfig to create detector if not exists
  Previously, the config would be silently discarded if called before
  the detector was lazily initialized by GetEnumerationDetector

- Add test_enumeration.py script for sandbox testing
  Includes fire-and-forget mode (--no-wait) for proper scanner simulation
This commit is contained in:
Ryan Malloy 2025-12-07 15:39:30 -07:00
parent c73fa9d3d1
commit 95a794ba69
2 changed files with 162 additions and 6 deletions

View File

@ -99,17 +99,26 @@ func GetEnumerationDetector(logger *zap.Logger) *EnumerationDetector {
} }
// SetEnumerationConfig updates the global detector configuration // SetEnumerationConfig updates the global detector configuration
// If the detector doesn't exist yet, it creates one with the given config
func SetEnumerationConfig(config EnumerationConfig) { func SetEnumerationConfig(config EnumerationConfig) {
enumDetectorMu.Lock() enumDetectorMu.Lock()
defer enumDetectorMu.Unlock() defer enumDetectorMu.Unlock()
if globalEnumDetector != nil { if globalEnumDetector == nil {
globalEnumDetector.config = config // Create detector with the provided config
// Rebuild exempt set globalEnumDetector = &EnumerationDetector{
globalEnumDetector.exemptSet = make(map[string]bool) config: config,
for _, ext := range config.ExemptExtensions { tracker: make(map[string]*ExtensionAttempts),
globalEnumDetector.exemptSet[ext] = true logger: zap.NewNop(), // Will be replaced on first use
} }
} else {
globalEnumDetector.config = config
}
// Rebuild exempt set
globalEnumDetector.exemptSet = make(map[string]bool)
for _, ext := range config.ExemptExtensions {
globalEnumDetector.exemptSet[ext] = true
} }
} }

View File

@ -0,0 +1,147 @@
#!/usr/bin/env python3
"""
SIP Extension Enumeration Test for SIP Guardian
Simulates SIPVicious svwar-style sequential extension scanning.
Should trigger enumeration detection after threshold is reached.
"""
import socket
import time
import argparse
import random
import string
def generate_call_id():
return ''.join(random.choices(string.ascii_letters + string.digits, k=32))
def generate_branch():
return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
def create_register_request(target_host: str, target_port: int, extension: str, from_ip: str, cseq: int) -> bytes:
"""Create a SIP REGISTER request targeting an extension.
Uses REGISTER method with incrementing CSeq to avoid triggering
the 'cseq-flood' pattern detection (which looks for 'CSeq: 1 OPTIONS').
"""
call_id = generate_call_id()
branch = generate_branch()
tag = ''.join(random.choices(string.digits, k=8))
request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r
Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r
Max-Forwards: 70\r
From: <sip:{extension}@{target_host}>;tag={tag}\r
To: <sip:{extension}@{target_host}>\r
Call-ID: {call_id}@{from_ip}\r
CSeq: {cseq} REGISTER\r
Contact: <sip:{extension}@{from_ip}:5060>\r
Expires: 3600\r
User-Agent: SIPPhone/1.0\r
Content-Length: 0\r
\r
"""
return request.encode()
def send_request(sock: socket.socket, target: tuple, request: bytes, timeout: float = 1.0, wait_response: bool = True) -> str:
"""Send request and optionally receive response.
In fire-and-forget mode (wait_response=False), we just send the packet
without waiting for a response. This better simulates scanner behavior.
"""
try:
sock.sendto(request, target)
if not wait_response:
return "SENT"
sock.settimeout(timeout)
response, _ = sock.recvfrom(4096)
return response.decode()
except socket.timeout:
return "TIMEOUT"
except Exception as e:
return f"ERROR: {e}"
def main():
parser = argparse.ArgumentParser(description='Test enumeration detection')
parser.add_argument('target', help='Target host (e.g., localhost)')
parser.add_argument('-p', '--port', type=int, default=5060, help='Target port')
parser.add_argument('-s', '--start', type=int, default=300, help='Start extension (default 300 to avoid test-extension patterns)')
parser.add_argument('-e', '--end', type=int, default=315, help='End extension')
parser.add_argument('-d', '--delay', type=float, default=0.1, help='Delay between requests')
parser.add_argument('--sequential', action='store_true', help='Use sequential extensions (triggers pattern detection)')
parser.add_argument('--random', action='store_true', help='Use random extensions')
parser.add_argument('--no-wait', action='store_true', help='Fire-and-forget mode (don\'t wait for responses)')
args = parser.parse_args()
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 0))
local_port = sock.getsockname()[1]
# Get local IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect((args.target, 80))
local_ip = s.getsockname()[0]
except:
local_ip = '127.0.0.1'
finally:
s.close()
target = (args.target, args.port)
print(f"🎯 Enumeration Test")
print(f" Target: {args.target}:{args.port}")
print(f" From: {local_ip}:{local_port}")
print(f" Extensions: {args.start} to {args.end}")
print(f" Mode: {'Sequential' if args.sequential else 'Random' if args.random else 'Linear'}")
print(f" Wait for response: {'No' if args.no_wait else 'Yes'}")
print("-" * 50)
extensions = list(range(args.start, args.end + 1))
if args.random:
random.shuffle(extensions)
blocked = False
for i, ext in enumerate(extensions, 1):
request = create_register_request(args.target, args.port, str(ext), local_ip, cseq=i)
response = send_request(sock, target, request, wait_response=not args.no_wait)
# Check if we got blocked (timeout or connection refused)
# In no-wait mode, we check later by sending a probe request
if args.no_wait:
status = "✓ SENT"
elif "TIMEOUT" in response or "ERROR" in response:
if not blocked:
print(f"\n⛔ BLOCKED after {i} requests (extension {ext})")
blocked = True
status = "❌ BLOCKED"
else:
# Parse response code
try:
status_line = response.split('\r\n')[0]
code = status_line.split()[1]
status = f"{code}"
except:
status = "? Unknown"
print(f" [{i:3d}] Extension {ext}: {status}")
if blocked:
# Try a few more to confirm we're blocked
if i > len(extensions) - 3:
break
time.sleep(args.delay)
sock.close()
print("-" * 50)
if blocked:
print("✅ Enumeration detection WORKING - attack was blocked!")
else:
print("⚠️ Attack completed without being blocked")
print(" (threshold may not have been reached)")
if __name__ == '__main__':
main()