Major features: - Extension enumeration detection with 3 detection algorithms: - Max unique extensions threshold (default: 20 in 5 min) - Sequential pattern detection (e.g., 100,101,102...) - Rapid-fire detection (many extensions in short window) - Prometheus metrics for all SIP Guardian operations - SQLite persistent storage for bans and attack history - Webhook notifications for ban/unban/suspicious events - GeoIP-based country blocking with continent shortcuts - Per-method rate limiting with token bucket algorithm Bug fixes: - Fix whitelist count always reporting zero in stats - Fix whitelisted connections metric never incrementing - Fix Caddyfile config not being applied to shared guardian New files: - enumeration.go: Extension enumeration detector - enumeration_test.go: 14 comprehensive unit tests - metrics.go: Prometheus metrics handler - storage.go: SQLite persistence layer - webhooks.go: Webhook notification system - geoip.go: MaxMind GeoIP integration - ratelimit.go: Per-method rate limiting Testing: - sandbox/ contains complete Docker Compose test environment - All 14 enumeration tests pass
124 lines
4.4 KiB
Python
124 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SIP Brute Force Simulation for SIP Guardian Testing
|
|
|
|
Simulates authentication failures to test rate limiting and banning.
|
|
"""
|
|
|
|
import socket
|
|
import time
|
|
import argparse
|
|
import random
|
|
import string
|
|
|
|
def generate_call_id():
|
|
"""Generate a random SIP Call-ID"""
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=32))
|
|
|
|
def generate_branch():
|
|
"""Generate a random Via branch parameter"""
|
|
return 'z9hG4bK' + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
|
|
|
def create_register_request(target_host: str, target_port: int, extension: str, from_ip: str) -> bytes:
|
|
"""Create a SIP REGISTER request"""
|
|
call_id = generate_call_id()
|
|
branch = generate_branch()
|
|
tag = ''.join(random.choices(string.digits, k=8))
|
|
|
|
request = f"""REGISTER sip:{target_host}:{target_port} SIP/2.0\r
|
|
Via: SIP/2.0/UDP {from_ip}:5060;branch={branch}\r
|
|
Max-Forwards: 70\r
|
|
From: <sip:{extension}@{target_host}>;tag={tag}\r
|
|
To: <sip:{extension}@{target_host}>\r
|
|
Call-ID: {call_id}@{from_ip}\r
|
|
CSeq: 1 REGISTER\r
|
|
Contact: <sip:{extension}@{from_ip}:5060>\r
|
|
Expires: 3600\r
|
|
User-Agent: BruteForcer/1.0\r
|
|
Content-Length: 0\r
|
|
\r
|
|
"""
|
|
return request.encode()
|
|
|
|
def send_register(sock: socket.socket, target: tuple, request: bytes) -> str:
|
|
"""Send REGISTER and receive response"""
|
|
try:
|
|
sock.sendto(request, target)
|
|
sock.settimeout(2.0)
|
|
response, _ = sock.recvfrom(4096)
|
|
return response.decode()
|
|
except socket.timeout:
|
|
return "TIMEOUT"
|
|
except Exception as e:
|
|
return f"ERROR: {e}"
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='SIP Brute Force Simulator')
|
|
parser.add_argument('target', help='Target host (Caddy proxy)')
|
|
parser.add_argument('-p', '--port', type=int, default=5060, help='Target port')
|
|
parser.add_argument('-e', '--extensions', default='100-105', help='Extension range (e.g., 100-200)')
|
|
parser.add_argument('-c', '--count', type=int, default=10, help='Attempts per extension')
|
|
parser.add_argument('-d', '--delay', type=float, default=0.1, help='Delay between attempts')
|
|
parser.add_argument('--udp', action='store_true', default=True, help='Use UDP (default)')
|
|
parser.add_argument('--tcp', action='store_true', help='Use TCP')
|
|
args = parser.parse_args()
|
|
|
|
# Parse extension range
|
|
if '-' in args.extensions:
|
|
start, end = map(int, args.extensions.split('-'))
|
|
extensions = list(range(start, end + 1))
|
|
else:
|
|
extensions = [int(args.extensions)]
|
|
|
|
# Get our IP
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.connect((args.target, args.port))
|
|
from_ip = sock.getsockname()[0]
|
|
sock.close()
|
|
|
|
print(f"[*] Starting brute force simulation")
|
|
print(f"[*] Target: {args.target}:{args.port}")
|
|
print(f"[*] Source IP: {from_ip}")
|
|
print(f"[*] Extensions: {extensions}")
|
|
print(f"[*] Attempts per extension: {args.count}")
|
|
print()
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
target = (args.target, args.port)
|
|
|
|
attempts = 0
|
|
blocked_at = None
|
|
|
|
for ext in extensions:
|
|
for i in range(args.count):
|
|
request = create_register_request(args.target, args.port, str(ext), from_ip)
|
|
response = send_register(sock, target, request)
|
|
attempts += 1
|
|
|
|
if 'TIMEOUT' in response:
|
|
if blocked_at is None:
|
|
blocked_at = attempts
|
|
print(f"[!] BLOCKED after {attempts} attempts (extension {ext}, attempt {i+1})")
|
|
print(f"[+] SIP Guardian is working! Blocked after {blocked_at} attempts")
|
|
return
|
|
elif '401' in response or '407' in response:
|
|
print(f"[*] Auth required: ext={ext} attempt={i+1} total={attempts}")
|
|
elif '403' in response:
|
|
print(f"[!] FORBIDDEN: ext={ext} - Connection blocked")
|
|
if blocked_at is None:
|
|
blocked_at = attempts
|
|
else:
|
|
# Print first line of response
|
|
first_line = response.split('\r\n')[0] if response else 'No response'
|
|
print(f"[?] Response: {first_line} (ext={ext})")
|
|
|
|
time.sleep(args.delay)
|
|
|
|
print(f"\n[*] Completed {attempts} attempts without being blocked")
|
|
print("[!] SIP Guardian may not be working correctly")
|
|
|
|
sock.close()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|