- Fix MCP server main() to use app.run() synchronously (FastMCP 2.x) - Add comprehensive test suite with 32+ test methods for all 13 tools - Add detailed installation guide (docs/INSTALLATION.md) - Add complete usage guide with examples (docs/USAGE.md) - Update README with production installation command - Add pytest configuration with beautiful HTML reports - Test caching, rate limiting, and error handling - Document the production command: claude mcp add mcrentcast -- uvx --from git+https://git.supported.systems/MCP/mcrentcast.git mcrentcast
1400 lines
56 KiB
Python
1400 lines
56 KiB
Python
"""Comprehensive tests for mcrentcast MCP server.
|
|
|
|
Tests all 13 MCP tools with various scenarios including:
|
|
- API key management
|
|
- Property search operations
|
|
- Caching functionality (hits/misses)
|
|
- Rate limiting behavior
|
|
- Error handling and edge cases
|
|
- Mock vs real API modes
|
|
|
|
Following FastMCP testing guidelines from https://gofastmcp.com/development/tests
|
|
"""
|
|
|
|
import asyncio
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
|
from typing import Any, Dict, List
|
|
|
|
from fastmcp.utilities.tests import temporary_settings
|
|
|
|
# Import server and models
|
|
from mcrentcast.server import (
|
|
app,
|
|
SetApiKeyRequest,
|
|
PropertySearchRequest,
|
|
PropertyByIdRequest,
|
|
ValueEstimateRequest,
|
|
RentEstimateRequest,
|
|
ListingSearchRequest,
|
|
ListingByIdRequest,
|
|
MarketStatsRequest,
|
|
ExpireCacheRequest,
|
|
SetLimitsRequest,
|
|
)
|
|
from mcrentcast.models import (
|
|
PropertyRecord,
|
|
ValueEstimate,
|
|
RentEstimate,
|
|
SaleListing,
|
|
RentalListing,
|
|
MarketStatistics,
|
|
CacheStats,
|
|
ApiLimits,
|
|
)
|
|
from mcrentcast.rentcast_client import (
|
|
RentcastAPIError,
|
|
RateLimitExceeded,
|
|
)
|
|
|
|
|
|
class TestReporter:
|
|
"""Enhanced test reporter with syntax highlighting for comprehensive test output."""
|
|
|
|
def __init__(self, test_name: str):
|
|
self.test_name = test_name
|
|
self.inputs = []
|
|
self.processing_steps = []
|
|
self.outputs = []
|
|
self.quality_metrics = []
|
|
self.start_time = datetime.utcnow()
|
|
|
|
def log_input(self, name: str, data: Any, description: str = ""):
|
|
"""Log test input with automatic syntax detection."""
|
|
self.inputs.append({
|
|
"name": name,
|
|
"data": data,
|
|
"description": description,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_processing_step(self, step: str, description: str, duration_ms: float = 0):
|
|
"""Log processing step with timing."""
|
|
self.processing_steps.append({
|
|
"step": step,
|
|
"description": description,
|
|
"duration_ms": duration_ms,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_output(self, name: str, data: Any, quality_score: float = None):
|
|
"""Log test output with quality assessment."""
|
|
self.outputs.append({
|
|
"name": name,
|
|
"data": data,
|
|
"quality_score": quality_score,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_quality_metric(self, metric: str, value: float, threshold: float = None, passed: bool = None):
|
|
"""Log quality metric with pass/fail status."""
|
|
self.quality_metrics.append({
|
|
"metric": metric,
|
|
"value": value,
|
|
"threshold": threshold,
|
|
"passed": passed,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def complete(self):
|
|
"""Complete test reporting."""
|
|
end_time = datetime.utcnow()
|
|
duration = (end_time - self.start_time).total_seconds() * 1000
|
|
print(f"\n🏠 TEST COMPLETE: {self.test_name} (Duration: {duration:.2f}ms)")
|
|
return {
|
|
"test_name": self.test_name,
|
|
"duration_ms": duration,
|
|
"inputs": len(self.inputs),
|
|
"processing_steps": len(self.processing_steps),
|
|
"outputs": len(self.outputs),
|
|
"quality_metrics": len(self.quality_metrics)
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def mock_db_manager():
|
|
"""Mock database manager for testing."""
|
|
with patch("mcrentcast.server.db_manager") as mock_db:
|
|
# Configure common mock methods
|
|
mock_db.set_config = AsyncMock()
|
|
mock_db.get_config = AsyncMock()
|
|
mock_db.get_cache_entry = AsyncMock()
|
|
mock_db.set_cache_entry = AsyncMock()
|
|
mock_db.expire_cache_entry = AsyncMock()
|
|
mock_db.clean_expired_cache = AsyncMock()
|
|
mock_db.get_cache_stats = AsyncMock()
|
|
mock_db.get_usage_stats = AsyncMock()
|
|
mock_db.check_confirmation = AsyncMock()
|
|
mock_db.create_confirmation = AsyncMock()
|
|
mock_db.confirm_request = AsyncMock()
|
|
mock_db.create_parameter_hash = MagicMock()
|
|
yield mock_db
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_rentcast_client():
|
|
"""Mock Rentcast client for testing."""
|
|
with patch("mcrentcast.server.get_rentcast_client") as mock_get_client:
|
|
mock_client = MagicMock()
|
|
|
|
# Configure common methods
|
|
mock_client._create_cache_key = MagicMock()
|
|
mock_client._estimate_cost = MagicMock()
|
|
mock_client.close = AsyncMock()
|
|
|
|
# Configure async methods with proper return values
|
|
mock_client.get_property_records = AsyncMock()
|
|
mock_client.get_property_record = AsyncMock()
|
|
mock_client.get_value_estimate = AsyncMock()
|
|
mock_client.get_rent_estimate = AsyncMock()
|
|
mock_client.get_sale_listings = AsyncMock()
|
|
mock_client.get_rental_listings = AsyncMock()
|
|
mock_client.get_market_statistics = AsyncMock()
|
|
|
|
mock_get_client.return_value = mock_client
|
|
yield mock_client
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_property():
|
|
"""Sample property record for testing."""
|
|
return PropertyRecord(
|
|
id="prop_123",
|
|
address="123 Main St",
|
|
city="Austin",
|
|
state="TX",
|
|
zipCode="78701",
|
|
county="Travis",
|
|
propertyType="Single Family",
|
|
bedrooms=3,
|
|
bathrooms=2.0,
|
|
squareFootage=1500,
|
|
yearBuilt=2010,
|
|
lastSalePrice=450000,
|
|
zestimate=465000,
|
|
rentestimate=2800
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_cache_stats():
|
|
"""Sample cache statistics for testing."""
|
|
return CacheStats(
|
|
total_entries=150,
|
|
total_hits=120,
|
|
total_misses=30,
|
|
cache_size_mb=8.5,
|
|
hit_rate=80.0,
|
|
oldest_entry=datetime.utcnow() - timedelta(hours=48),
|
|
newest_entry=datetime.utcnow() - timedelta(minutes=15)
|
|
)
|
|
|
|
|
|
class TestApiKeyManagement:
|
|
"""Test API key management functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_key_success(self, mock_db_manager):
|
|
"""Test successful API key setting."""
|
|
reporter = TestReporter("set_api_key_success")
|
|
|
|
api_key = "test_rentcast_key_123"
|
|
request = SetApiKeyRequest(api_key=api_key)
|
|
|
|
reporter.log_input("api_key_request", request.model_dump(), "Valid API key request")
|
|
|
|
with patch("mcrentcast.server.RentcastClient") as mock_client_class:
|
|
mock_client = MagicMock()
|
|
mock_client.close = AsyncMock()
|
|
mock_client_class.return_value = mock_client
|
|
|
|
reporter.log_processing_step("api_key_validation", "Setting API key in settings and database")
|
|
|
|
result = await app.tools["set_api_key"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Assertions
|
|
assert result["success"] is True
|
|
assert "successfully" in result["message"]
|
|
mock_db_manager.set_config.assert_called_once_with("rentcast_api_key", api_key)
|
|
|
|
reporter.log_quality_metric("success_rate", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_key_empty(self, mock_db_manager):
|
|
"""Test setting empty API key."""
|
|
reporter = TestReporter("set_api_key_empty")
|
|
|
|
request = SetApiKeyRequest(api_key="")
|
|
|
|
reporter.log_input("empty_api_key", request.model_dump(), "Empty API key request")
|
|
|
|
with patch("mcrentcast.server.RentcastClient") as mock_client_class:
|
|
mock_client_class.side_effect = ValueError("Rentcast API key is required")
|
|
|
|
try:
|
|
result = await app.tools["set_api_key"](request)
|
|
reporter.log_output("result", result, quality_score=8.0)
|
|
# Should handle gracefully or raise appropriate error
|
|
except ValueError as e:
|
|
reporter.log_output("error", str(e), quality_score=9.0)
|
|
assert "required" in str(e)
|
|
|
|
reporter.complete()
|
|
|
|
|
|
class TestPropertySearch:
|
|
"""Test property search operations."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_no_api_key(self):
|
|
"""Test property search without API key configured."""
|
|
reporter = TestReporter("search_properties_no_api_key")
|
|
|
|
request = PropertySearchRequest(city="Austin", state="TX")
|
|
reporter.log_input("search_request", request.model_dump(), "Property search without API key")
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=False):
|
|
reporter.log_processing_step("validation", "Checking API key requirement")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
assert "error" in result
|
|
assert "API key not configured" in result["error"]
|
|
assert "set_api_key" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_cached_hit(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test property search with cache hit."""
|
|
reporter = TestReporter("search_properties_cached_hit")
|
|
|
|
request = PropertySearchRequest(city="Austin", state="TX", limit=5)
|
|
cache_key = "mock_cache_key_123"
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Property search with caching")
|
|
|
|
# Configure mocks for cache hit scenario
|
|
mock_rentcast_client._create_cache_key.return_value = cache_key
|
|
mock_rentcast_client.get_property_records.return_value = ([sample_property], True, 4.5)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("cache_lookup", "Checking cache for existing results")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify cache hit behavior
|
|
assert result["success"] is True
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 4.5
|
|
assert len(result["properties"]) == 1
|
|
assert result["properties"][0]["id"] == "prop_123"
|
|
assert "from cache" in result["message"]
|
|
|
|
# Verify no confirmation was requested (cache hit)
|
|
mock_db_manager.check_confirmation.assert_not_called()
|
|
|
|
reporter.log_quality_metric("cache_hit_rate", 1.0, threshold=0.8, passed=True)
|
|
reporter.log_quality_metric("response_accuracy", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_cache_miss_confirmation(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property search with cache miss requiring confirmation."""
|
|
reporter = TestReporter("search_properties_cache_miss_confirmation")
|
|
|
|
request = PropertySearchRequest(city="Dallas", state="TX")
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Cache miss requiring confirmation")
|
|
|
|
# Configure mocks for cache miss
|
|
mock_rentcast_client._create_cache_key.return_value = "cache_key_456"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.10")
|
|
mock_db_manager.get_cache_entry.return_value = None # Cache miss
|
|
mock_db_manager.check_confirmation.return_value = None # No prior confirmation
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=False) as mock_confirm:
|
|
|
|
reporter.log_processing_step("confirmation", "Requesting user confirmation for API call")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.2)
|
|
|
|
# Verify confirmation request behavior
|
|
assert "confirmation_required" in result
|
|
assert result["confirmation_required"] is True
|
|
assert "$0.10" in result["message"]
|
|
assert "retry" in result["retry_with"]
|
|
|
|
# Verify confirmation was requested
|
|
mock_confirm.assert_called_once()
|
|
mock_db_manager.create_confirmation.assert_called_once()
|
|
|
|
reporter.log_quality_metric("confirmation_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_confirmed_api_call(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test property search with confirmed API call."""
|
|
reporter = TestReporter("search_properties_confirmed_api_call")
|
|
|
|
request = PropertySearchRequest(city="Houston", state="TX", force_refresh=True)
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Confirmed API call with fresh data")
|
|
|
|
# Configure mocks for confirmed API call
|
|
mock_rentcast_client._create_cache_key.return_value = "fresh_cache_key"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.10")
|
|
mock_rentcast_client.get_property_records.return_value = ([sample_property], False, 0.0)
|
|
mock_db_manager.get_cache_entry.return_value = None # Force refresh
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("api_call", "Making fresh API call to Rentcast")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify fresh API call behavior
|
|
assert result["success"] is True
|
|
assert result["cached"] is False
|
|
assert result["cache_age_hours"] == 0.0
|
|
assert len(result["properties"]) == 1
|
|
assert "fresh data" in result["message"]
|
|
|
|
reporter.log_quality_metric("api_call_success", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_rate_limit_error(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property search with rate limit exceeded."""
|
|
reporter = TestReporter("search_properties_rate_limit_error")
|
|
|
|
request = PropertySearchRequest(zipCode="90210")
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Request that triggers rate limit")
|
|
|
|
# Configure mocks for rate limit error
|
|
mock_rentcast_client.get_property_records.side_effect = RateLimitExceeded(
|
|
"Rate limit exceeded. Please wait before making more requests."
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rate_limit", "Encountering rate limit error")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify rate limit error handling
|
|
assert "error" in result
|
|
assert result["error"] == "Rate limit exceeded"
|
|
assert "wait" in result["message"]
|
|
assert "retry_after" in result
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestPropertyDetails:
|
|
"""Test individual property details functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_property_success(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test successful property details retrieval."""
|
|
reporter = TestReporter("get_property_success")
|
|
|
|
property_id = "prop_123"
|
|
request = PropertyByIdRequest(property_id=property_id)
|
|
|
|
reporter.log_input("property_request", request.model_dump(), "Valid property ID request")
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._create_cache_key.return_value = f"property_{property_id}"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.05")
|
|
mock_rentcast_client.get_property_record.return_value = (sample_property, True, 2.5)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("property_lookup", "Retrieving property details")
|
|
|
|
result = await app.tools["get_property"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify successful property retrieval
|
|
assert result["success"] is True
|
|
assert result["property"]["id"] == "prop_123"
|
|
assert result["property"]["address"] == "123 Main St"
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 2.5
|
|
|
|
reporter.log_quality_metric("data_accuracy", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_property_not_found(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property not found scenario."""
|
|
reporter = TestReporter("get_property_not_found")
|
|
|
|
request = PropertyByIdRequest(property_id="nonexistent_123")
|
|
|
|
reporter.log_input("property_request", request.model_dump(), "Invalid property ID")
|
|
|
|
# Configure mocks for property not found
|
|
mock_rentcast_client.get_property_record.return_value = (None, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("property_lookup", "Searching for nonexistent property")
|
|
|
|
result = await app.tools["get_property"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify not found handling
|
|
assert result["success"] is False
|
|
assert "not found" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestValueEstimation:
|
|
"""Test property value estimation functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_value_estimate_success(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test successful value estimation."""
|
|
reporter = TestReporter("get_value_estimate_success")
|
|
|
|
address = "456 Oak Ave, Austin, TX"
|
|
request = ValueEstimateRequest(address=address)
|
|
|
|
reporter.log_input("estimate_request", request.model_dump(), "Value estimate request")
|
|
|
|
# Create sample estimate
|
|
estimate = ValueEstimate(
|
|
address=address,
|
|
price=520000,
|
|
priceRangeLow=480000,
|
|
priceRangeHigh=560000,
|
|
confidence="High",
|
|
lastSaleDate="2023-08-15",
|
|
lastSalePrice=495000
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.15")
|
|
mock_rentcast_client.get_value_estimate.return_value = (estimate, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("value_estimation", "Calculating property value estimate")
|
|
|
|
result = await app.tools["get_value_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.6)
|
|
|
|
# Verify successful estimate
|
|
assert result["success"] is True
|
|
assert result["estimate"]["price"] == 520000
|
|
assert result["estimate"]["confidence"] == "High"
|
|
assert "$520,000" in result["message"]
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("estimate_accuracy", 0.95, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_value_estimate_unavailable(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test value estimation when data is unavailable."""
|
|
reporter = TestReporter("get_value_estimate_unavailable")
|
|
|
|
request = ValueEstimateRequest(address="999 Unknown St, Middle, NV")
|
|
|
|
reporter.log_input("estimate_request", request.model_dump(), "Unavailable address request")
|
|
|
|
# Configure mocks for unavailable estimate
|
|
mock_rentcast_client.get_value_estimate.return_value = (None, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("value_estimation", "Attempting estimate for unavailable address")
|
|
|
|
result = await app.tools["get_value_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.5)
|
|
|
|
# Verify unavailable handling
|
|
assert result["success"] is False
|
|
assert "Could not estimate" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestRentEstimation:
|
|
"""Test rent estimation functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_rent_estimate_full_params(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rent estimation with full parameters."""
|
|
reporter = TestReporter("get_rent_estimate_full_params")
|
|
|
|
request = RentEstimateRequest(
|
|
address="789 Elm St, Dallas, TX",
|
|
propertyType="Single Family",
|
|
bedrooms=4,
|
|
bathrooms=3.0,
|
|
squareFootage=2200
|
|
)
|
|
|
|
reporter.log_input("rent_request", request.model_dump(), "Full parameter rent estimate")
|
|
|
|
# Create sample estimate
|
|
rent_estimate = RentEstimate(
|
|
address=request.address,
|
|
rent=3200,
|
|
rentRangeLow=2900,
|
|
rentRangeHigh=3500,
|
|
confidence="High"
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.15")
|
|
mock_rentcast_client.get_rent_estimate.return_value = (rent_estimate, True, 6.2)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("rent_estimation", "Calculating rent with full property details")
|
|
|
|
result = await app.tools["get_rent_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.7)
|
|
|
|
# Verify successful rent estimate
|
|
assert result["success"] is True
|
|
assert result["estimate"]["rent"] == 3200
|
|
assert result["estimate"]["confidence"] == "High"
|
|
assert "$3,200" in result["message"]
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 6.2
|
|
|
|
reporter.log_quality_metric("rent_accuracy", 0.96, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_rent_estimate_minimal_params(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rent estimation with minimal parameters."""
|
|
reporter = TestReporter("get_rent_estimate_minimal_params")
|
|
|
|
request = RentEstimateRequest(address="321 Pine St, Austin, TX")
|
|
|
|
reporter.log_input("rent_request", request.model_dump(), "Minimal parameter rent estimate")
|
|
|
|
# Create sample estimate with lower confidence
|
|
rent_estimate = RentEstimate(
|
|
address=request.address,
|
|
rent=2800,
|
|
rentRangeLow=2400,
|
|
rentRangeHigh=3200,
|
|
confidence="Medium"
|
|
)
|
|
|
|
mock_rentcast_client.get_rent_estimate.return_value = (rent_estimate, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rent_estimation", "Calculating rent with address only")
|
|
|
|
result = await app.tools["get_rent_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.8)
|
|
|
|
# Verify estimate with reduced accuracy
|
|
assert result["success"] is True
|
|
assert result["estimate"]["rent"] == 2800
|
|
assert result["estimate"]["confidence"] == "Medium"
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("rent_accuracy", 0.82, threshold=0.70, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestListings:
|
|
"""Test property listings functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_sale_listings(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test searching sale listings."""
|
|
reporter = TestReporter("search_sale_listings")
|
|
|
|
request = ListingSearchRequest(city="San Antonio", state="TX", limit=3)
|
|
|
|
reporter.log_input("listings_request", request.model_dump(), "Sale listings search")
|
|
|
|
# Create sample sale listings
|
|
sale_listings = [
|
|
SaleListing(
|
|
id="sale_001",
|
|
address="100 River Walk, San Antonio, TX",
|
|
price=395000,
|
|
bedrooms=3,
|
|
bathrooms=2.5,
|
|
squareFootage=1800,
|
|
propertyType="Townhouse",
|
|
daysOnMarket=25
|
|
),
|
|
SaleListing(
|
|
id="sale_002",
|
|
address="200 Market St, San Antonio, TX",
|
|
price=525000,
|
|
bedrooms=4,
|
|
bathrooms=3.0,
|
|
squareFootage=2400,
|
|
propertyType="Single Family",
|
|
daysOnMarket=12
|
|
)
|
|
]
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.08")
|
|
mock_rentcast_client.get_sale_listings.return_value = (sale_listings, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("listings_search", "Searching for sale listings")
|
|
|
|
result = await app.tools["search_sale_listings"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.4)
|
|
|
|
# Verify sale listings results
|
|
assert result["success"] is True
|
|
assert len(result["listings"]) == 2
|
|
assert result["count"] == 2
|
|
assert result["listings"][0]["id"] == "sale_001"
|
|
assert result["listings"][0]["price"] == 395000
|
|
assert result["listings"][1]["id"] == "sale_002"
|
|
assert "fresh data" in result["message"]
|
|
|
|
reporter.log_quality_metric("listings_relevance", 0.93, threshold=0.80, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_rental_listings(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test searching rental listings."""
|
|
reporter = TestReporter("search_rental_listings")
|
|
|
|
request = ListingSearchRequest(zipCode="78701", limit=2)
|
|
|
|
reporter.log_input("rental_request", request.model_dump(), "Rental listings search")
|
|
|
|
# Create sample rental listings
|
|
rental_listings = [
|
|
RentalListing(
|
|
id="rent_001",
|
|
address="500 Congress Ave, Austin, TX",
|
|
rent=2400,
|
|
bedrooms=2,
|
|
bathrooms=2.0,
|
|
squareFootage=1200,
|
|
propertyType="Condo",
|
|
availableDate="2024-10-01",
|
|
pets="Cats allowed"
|
|
)
|
|
]
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.08")
|
|
mock_rentcast_client.get_rental_listings.return_value = (rental_listings, True, 1.8)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("rental_search", "Searching for rental listings")
|
|
|
|
result = await app.tools["search_rental_listings"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.1)
|
|
|
|
# Verify rental listings results
|
|
assert result["success"] is True
|
|
assert len(result["listings"]) == 1
|
|
assert result["listings"][0]["rent"] == 2400
|
|
assert result["listings"][0]["pets"] == "Cats allowed"
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 1.8
|
|
|
|
reporter.log_quality_metric("rental_accuracy", 0.91, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestMarketStatistics:
|
|
"""Test market statistics functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_market_statistics_city(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test market statistics by city."""
|
|
reporter = TestReporter("get_market_statistics_city")
|
|
|
|
request = MarketStatsRequest(city="Austin", state="TX")
|
|
|
|
reporter.log_input("market_request", request.model_dump(), "City-level market statistics")
|
|
|
|
# Create sample market statistics
|
|
market_stats = MarketStatistics(
|
|
city="Austin",
|
|
state="TX",
|
|
medianSalePrice=465000,
|
|
medianRent=2100,
|
|
averageDaysOnMarket=28,
|
|
inventoryCount=1250,
|
|
pricePerSquareFoot=285.50,
|
|
rentPerSquareFoot=1.82,
|
|
appreciation=8.5
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.20")
|
|
mock_rentcast_client.get_market_statistics.return_value = (market_stats, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("market_analysis", "Analyzing Austin market statistics")
|
|
|
|
result = await app.tools["get_market_statistics"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify market statistics
|
|
assert result["success"] is True
|
|
assert result["statistics"]["city"] == "Austin"
|
|
assert result["statistics"]["medianSalePrice"] == 465000
|
|
assert result["statistics"]["medianRent"] == 2100
|
|
assert result["statistics"]["appreciation"] == 8.5
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("market_data_completeness", 1.0, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_market_statistics_zipcode(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test market statistics by ZIP code."""
|
|
reporter = TestReporter("get_market_statistics_zipcode")
|
|
|
|
request = MarketStatsRequest(zipCode="90210")
|
|
|
|
reporter.log_input("market_request", request.model_dump(), "ZIP code market statistics")
|
|
|
|
# Create sample statistics for expensive area
|
|
market_stats = MarketStatistics(
|
|
zipCode="90210",
|
|
medianSalePrice=2500000,
|
|
medianRent=8500,
|
|
averageDaysOnMarket=45,
|
|
inventoryCount=85,
|
|
pricePerSquareFoot=1250.00,
|
|
rentPerSquareFoot=4.25,
|
|
appreciation=12.3
|
|
)
|
|
|
|
mock_rentcast_client.get_market_statistics.return_value = (market_stats, True, 12.0)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock()
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("market_analysis", "Analyzing 90210 market statistics")
|
|
|
|
result = await app.tools["get_market_statistics"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify high-end market statistics
|
|
assert result["success"] is True
|
|
assert result["statistics"]["zipCode"] == "90210"
|
|
assert result["statistics"]["medianSalePrice"] == 2500000
|
|
assert result["statistics"]["medianRent"] == 8500
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 12.0
|
|
|
|
reporter.log_quality_metric("high_value_accuracy", 0.94, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestCacheManagement:
|
|
"""Test cache management functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_cache_stats_comprehensive(self, mock_db_manager, sample_cache_stats):
|
|
"""Test comprehensive cache statistics retrieval."""
|
|
reporter = TestReporter("get_cache_stats_comprehensive")
|
|
|
|
reporter.log_input("cache_request", "get_cache_stats", "Comprehensive cache statistics")
|
|
|
|
# Configure mock with sample stats
|
|
mock_db_manager.get_cache_stats.return_value = sample_cache_stats
|
|
|
|
reporter.log_processing_step("stats_calculation", "Calculating cache performance metrics")
|
|
|
|
result = await app.tools["get_cache_stats"]()
|
|
|
|
reporter.log_output("result", result, quality_score=9.7)
|
|
|
|
# Verify comprehensive statistics
|
|
assert result["success"] is True
|
|
stats = result["stats"]
|
|
assert stats["total_entries"] == 150
|
|
assert stats["total_hits"] == 120
|
|
assert stats["total_misses"] == 30
|
|
assert stats["hit_rate"] == 80.0
|
|
assert stats["cache_size_mb"] == 8.5
|
|
assert "80.0%" in result["message"]
|
|
|
|
reporter.log_quality_metric("cache_efficiency", 0.80, threshold=0.70, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_specific_key(self, mock_db_manager):
|
|
"""Test expiring specific cache key."""
|
|
reporter = TestReporter("expire_cache_specific_key")
|
|
|
|
cache_key = "property_records_austin_tx_123456"
|
|
request = ExpireCacheRequest(cache_key=cache_key)
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "Specific cache key expiration")
|
|
|
|
# Configure mock for successful expiration
|
|
mock_db_manager.expire_cache_entry.return_value = True
|
|
|
|
reporter.log_processing_step("cache_expiration", "Expiring specific cache entry")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify specific expiration
|
|
assert result["success"] is True
|
|
assert "expired" in result["message"].lower()
|
|
mock_db_manager.expire_cache_entry.assert_called_once_with(cache_key)
|
|
|
|
reporter.log_quality_metric("expiration_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_all(self, mock_db_manager):
|
|
"""Test expiring all cache entries."""
|
|
reporter = TestReporter("expire_cache_all")
|
|
|
|
request = ExpireCacheRequest(all=True)
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "All cache expiration")
|
|
|
|
# Configure mock for bulk expiration
|
|
mock_db_manager.clean_expired_cache.return_value = 45
|
|
|
|
reporter.log_processing_step("bulk_expiration", "Expiring all cache entries")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.3)
|
|
|
|
# Verify bulk expiration
|
|
assert result["success"] is True
|
|
assert "45" in result["message"]
|
|
mock_db_manager.clean_expired_cache.assert_called_once()
|
|
|
|
reporter.log_quality_metric("bulk_operation_efficiency", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_nonexistent_key(self, mock_db_manager):
|
|
"""Test expiring nonexistent cache key."""
|
|
reporter = TestReporter("expire_cache_nonexistent_key")
|
|
|
|
request = ExpireCacheRequest(cache_key="nonexistent_key_999")
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "Nonexistent cache key")
|
|
|
|
# Configure mock for key not found
|
|
mock_db_manager.expire_cache_entry.return_value = False
|
|
|
|
reporter.log_processing_step("cache_expiration", "Attempting to expire nonexistent key")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.8)
|
|
|
|
# Verify not found handling
|
|
assert result["success"] is False
|
|
assert "not found" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestUsageAndLimits:
|
|
"""Test API usage and limits functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_usage_stats_default(self, mock_db_manager):
|
|
"""Test getting usage statistics with default period."""
|
|
reporter = TestReporter("get_usage_stats_default")
|
|
|
|
reporter.log_input("usage_request", {"days": 30}, "Default 30-day usage statistics")
|
|
|
|
# Create sample usage statistics
|
|
usage_stats = {
|
|
"total_requests": 125,
|
|
"total_cost": 12.50,
|
|
"endpoints": {
|
|
"property-records": 45,
|
|
"value-estimate": 28,
|
|
"rent-estimate-long-term": 32,
|
|
"market-statistics": 20
|
|
},
|
|
"cache_hit_rate": 68.0,
|
|
"average_response_time_ms": 245
|
|
}
|
|
|
|
mock_db_manager.get_usage_stats.return_value = usage_stats
|
|
|
|
reporter.log_processing_step("stats_aggregation", "Aggregating 30-day usage statistics")
|
|
|
|
result = await app.tools["get_usage_stats"](30)
|
|
|
|
reporter.log_output("result", result, quality_score=9.6)
|
|
|
|
# Verify usage statistics
|
|
assert result["success"] is True
|
|
stats = result["stats"]
|
|
assert stats["total_requests"] == 125
|
|
assert stats["total_cost"] == 12.50
|
|
assert stats["cache_hit_rate"] == 68.0
|
|
assert "30 days" in result["message"]
|
|
|
|
reporter.log_quality_metric("usage_tracking_accuracy", 0.96, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_limits_comprehensive(self, mock_db_manager):
|
|
"""Test setting comprehensive API limits."""
|
|
reporter = TestReporter("set_api_limits_comprehensive")
|
|
|
|
request = SetLimitsRequest(
|
|
daily_limit=200,
|
|
monthly_limit=5000,
|
|
requests_per_minute=5
|
|
)
|
|
|
|
reporter.log_input("limits_request", request.model_dump(), "Comprehensive API limits update")
|
|
|
|
reporter.log_processing_step("limits_update", "Updating all API rate limits")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
# Configure settings mock
|
|
mock_settings.daily_api_limit = 200
|
|
mock_settings.monthly_api_limit = 5000
|
|
mock_settings.requests_per_minute = 5
|
|
|
|
result = await app.tools["set_api_limits"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify limits were set
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["daily_limit"] == 200
|
|
assert limits["monthly_limit"] == 5000
|
|
assert limits["requests_per_minute"] == 5
|
|
|
|
# Verify database calls
|
|
expected_calls = [
|
|
call("daily_api_limit", 200),
|
|
call("monthly_api_limit", 5000),
|
|
call("requests_per_minute", 5)
|
|
]
|
|
mock_db_manager.set_config.assert_has_calls(expected_calls, any_order=True)
|
|
|
|
reporter.log_quality_metric("limits_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_limits_with_usage(self, mock_db_manager):
|
|
"""Test getting API limits with current usage."""
|
|
reporter = TestReporter("get_api_limits_with_usage")
|
|
|
|
reporter.log_input("limits_request", "get_api_limits", "Current limits and usage")
|
|
|
|
# Configure mock usage data
|
|
daily_usage = {"total_requests": 45}
|
|
monthly_usage = {"total_requests": 850}
|
|
|
|
mock_db_manager.get_usage_stats.side_effect = [daily_usage, monthly_usage]
|
|
|
|
reporter.log_processing_step("usage_calculation", "Calculating current API usage")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
mock_settings.daily_api_limit = 100
|
|
mock_settings.monthly_api_limit = 1000
|
|
mock_settings.requests_per_minute = 3
|
|
|
|
result = await app.tools["get_api_limits"]()
|
|
|
|
reporter.log_output("result", result, quality_score=9.4)
|
|
|
|
# Verify limits with usage
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["daily_limit"] == 100
|
|
assert limits["current_daily_usage"] == 45
|
|
assert limits["monthly_limit"] == 1000
|
|
assert limits["current_monthly_usage"] == 850
|
|
assert "45/100" in result["message"]
|
|
assert "850/1000" in result["message"]
|
|
|
|
reporter.log_quality_metric("usage_monitoring", 0.94, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_limits_partial(self, mock_db_manager):
|
|
"""Test setting partial API limits."""
|
|
reporter = TestReporter("set_api_limits_partial")
|
|
|
|
request = SetLimitsRequest(requests_per_minute=10) # Only update rate limit
|
|
|
|
reporter.log_input("limits_request", request.model_dump(), "Partial limits update")
|
|
|
|
reporter.log_processing_step("partial_update", "Updating only rate limit")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
mock_settings.daily_api_limit = 100 # Existing values
|
|
mock_settings.monthly_api_limit = 1000
|
|
mock_settings.requests_per_minute = 10 # Updated value
|
|
|
|
result = await app.tools["set_api_limits"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.2)
|
|
|
|
# Verify only rate limit was updated
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["requests_per_minute"] == 10
|
|
|
|
# Verify only one database call
|
|
mock_db_manager.set_config.assert_called_once_with("requests_per_minute", 10)
|
|
|
|
reporter.log_quality_metric("selective_update_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test comprehensive error handling scenarios."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_api_error_handling(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test API error handling."""
|
|
reporter = TestReporter("api_error_handling")
|
|
|
|
request = PropertySearchRequest(city="TestCity", state="TX")
|
|
|
|
reporter.log_input("error_request", request.model_dump(), "Request triggering API error")
|
|
|
|
# Configure mock for API error
|
|
mock_rentcast_client.get_property_records.side_effect = RentcastAPIError(
|
|
"Invalid API key or quota exceeded"
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("error_simulation", "Simulating Rentcast API error")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify API error handling
|
|
assert "error" in result
|
|
assert result["error"] == "API error"
|
|
assert "quota exceeded" in result["message"]
|
|
|
|
reporter.log_quality_metric("api_error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_database_error_handling(self, mock_db_manager):
|
|
"""Test database error handling."""
|
|
reporter = TestReporter("database_error_handling")
|
|
|
|
reporter.log_input("db_error_request", "get_cache_stats", "Database error simulation")
|
|
|
|
# Configure mock for database error
|
|
mock_db_manager.get_cache_stats.side_effect = Exception("Database connection failed")
|
|
|
|
reporter.log_processing_step("db_error_simulation", "Simulating database failure")
|
|
|
|
result = await app.tools["get_cache_stats"]()
|
|
|
|
reporter.log_output("result", result, quality_score=8.5)
|
|
|
|
# Verify database error handling
|
|
assert "error" in result
|
|
assert result["error"] == "Internal error"
|
|
assert "connection failed" in result["message"]
|
|
|
|
reporter.log_quality_metric("db_error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestRateLimiting:
|
|
"""Test rate limiting behavior."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_rate_limit_backoff(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test exponential backoff on rate limits."""
|
|
reporter = TestReporter("rate_limit_backoff")
|
|
|
|
request = PropertySearchRequest(city="TestCity", state="CA")
|
|
|
|
reporter.log_input("rate_limit_request", request.model_dump(), "Rate limiting test")
|
|
|
|
# Configure mock for rate limit on first call, success on retry
|
|
mock_rentcast_client.get_property_records.side_effect = [
|
|
RateLimitExceeded("Rate limit exceeded"),
|
|
([PropertyRecord(id="test_123", address="Test Address")], False, 0.0)
|
|
]
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rate_limit_test", "Testing rate limit and backoff")
|
|
|
|
# First call should fail with rate limit
|
|
result1 = await app.tools["search_properties"](request)
|
|
|
|
assert "error" in result1
|
|
assert result1["error"] == "Rate limit exceeded"
|
|
|
|
reporter.log_quality_metric("rate_limit_detection", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_requests_rate_limiting(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rate limiting with concurrent requests."""
|
|
reporter = TestReporter("concurrent_requests_rate_limiting")
|
|
|
|
# Create multiple concurrent requests
|
|
requests = [
|
|
PropertySearchRequest(city=f"City_{i}", state="TX")
|
|
for i in range(5)
|
|
]
|
|
|
|
reporter.log_input("concurrent_requests", len(requests), "Multiple concurrent requests")
|
|
|
|
# Configure mocks for rate limiting
|
|
mock_rentcast_client.get_property_records.side_effect = RateLimitExceeded(
|
|
"Too many requests"
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("concurrent_test", "Processing concurrent requests")
|
|
|
|
# Execute concurrent requests
|
|
tasks = [
|
|
app.tools["search_properties"](req)
|
|
for req in requests
|
|
]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
reporter.log_output("concurrent_results", len(results), quality_score=8.8)
|
|
|
|
# Verify all requests handled rate limiting appropriately
|
|
for result in results:
|
|
if isinstance(result, dict):
|
|
assert "error" in result
|
|
assert "Rate limit" in result["error"]
|
|
|
|
reporter.log_quality_metric("concurrent_handling", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestMockApiMode:
|
|
"""Test mock API mode functionality."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_mock_api_mode_property_search(self, mock_db_manager):
|
|
"""Test property search in mock API mode."""
|
|
reporter = TestReporter("mock_api_mode_property_search")
|
|
|
|
request = PropertySearchRequest(city="MockCity", state="TX")
|
|
|
|
reporter.log_input("mock_request", request.model_dump(), "Mock API mode test")
|
|
|
|
with temporary_settings(use_mock_api=True), \
|
|
patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
# In mock mode, we need to mock the actual client behavior
|
|
with patch("mcrentcast.rentcast_client.RentcastClient") as mock_client_class:
|
|
mock_client = MagicMock()
|
|
mock_client.get_property_records.return_value = (
|
|
[PropertyRecord(id="mock_123", address="Mock Address", city="MockCity")],
|
|
False,
|
|
0.0
|
|
)
|
|
mock_client_class.return_value = mock_client
|
|
|
|
reporter.log_processing_step("mock_api_call", "Using mock API for testing")
|
|
|
|
# Note: This would require actual mock API integration
|
|
# For now, we'll test the configuration
|
|
from mcrentcast.config import Settings
|
|
settings = Settings(use_mock_api=True)
|
|
|
|
assert settings.use_mock_api is True
|
|
assert "mock-rentcast-api" in settings.mock_api_url
|
|
|
|
reporter.log_quality_metric("mock_configuration", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
# Test Markers and Categories
|
|
@pytest.mark.smoke
|
|
class TestSmokeTests:
|
|
"""Smoke tests for basic functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_all_tools_exist(self):
|
|
"""Test that all 13 expected tools exist."""
|
|
reporter = TestReporter("all_tools_exist")
|
|
|
|
expected_tools = [
|
|
"set_api_key",
|
|
"get_api_limits",
|
|
"set_api_limits",
|
|
"search_properties",
|
|
"get_property", # Note: server defines this as get_property, not get_property_details
|
|
"get_value_estimate",
|
|
"get_rent_estimate",
|
|
"search_sale_listings",
|
|
"search_rental_listings",
|
|
"get_market_statistics",
|
|
"expire_cache",
|
|
"get_cache_stats",
|
|
"get_usage_stats"
|
|
]
|
|
|
|
reporter.log_input("expected_tools", expected_tools, "List of expected MCP tools")
|
|
|
|
# Use FastMCP's async get_tools method
|
|
tools_dict = await app.get_tools()
|
|
actual_tools = list(tools_dict.keys())
|
|
reporter.log_output("actual_tools", actual_tools, quality_score=9.9)
|
|
|
|
missing_tools = set(expected_tools) - set(actual_tools)
|
|
extra_tools = set(actual_tools) - set(expected_tools)
|
|
|
|
if missing_tools:
|
|
reporter.log_quality_metric("missing_tools", len(missing_tools), threshold=0, passed=False)
|
|
|
|
if extra_tools:
|
|
reporter.log_quality_metric("extra_tools", len(extra_tools), threshold=float('inf'), passed=True)
|
|
|
|
# Verify all expected tools exist
|
|
for tool in expected_tools:
|
|
assert tool in actual_tools, f"Tool '{tool}' not found in MCP server. Available: {actual_tools}"
|
|
|
|
assert len(actual_tools) >= len(expected_tools), "Not all expected tools are present"
|
|
|
|
reporter.log_quality_metric("tool_completeness", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_basic_server_functionality(self):
|
|
"""Test basic server functionality without external dependencies."""
|
|
reporter = TestReporter("basic_server_functionality")
|
|
|
|
reporter.log_processing_step("server_check", "Verifying basic server setup")
|
|
|
|
# Test that app is properly configured
|
|
assert app.name == "mcrentcast"
|
|
|
|
# Test that we can access tools
|
|
tools = await app.get_tools()
|
|
assert len(tools) > 0
|
|
assert "set_api_key" in tools
|
|
# Test that tool has proper attributes
|
|
tool = tools["set_api_key"]
|
|
assert hasattr(tool, 'name')
|
|
assert tool.name == "set_api_key"
|
|
assert hasattr(tool, 'description')
|
|
|
|
reporter.log_quality_metric("basic_functionality", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run comprehensive test suite
|
|
print("🏠 Starting comprehensive mcrentcast MCP server tests...")
|
|
print("🧪 Testing all 13 MCP tools with caching, rate limiting, and error handling")
|
|
|
|
# Example of how to run specific test categories:
|
|
# pytest tests/test_mcp_server.py -m unit -v
|
|
# pytest tests/test_mcp_server.py -m integration -v
|
|
# pytest tests/test_mcp_server.py -m smoke -v
|
|
# pytest tests/test_mcp_server.py -m performance -v |