- Fix test imports to use correct package paths (mcrentcast instead of src.mcrentcast) - Rename TestReporter to ReportGenerator to avoid pytest collection warnings - Add build exclusions to prevent unwanted files in source distribution - Add temporary refactoring docs to .gitignore Build is now clean and ready for PyPI publication. All core functionality verified working.
1400 lines
56 KiB
Python
1400 lines
56 KiB
Python
"""Comprehensive tests for mcrentcast MCP server.
|
|
|
|
Tests all 13 MCP tools with various scenarios including:
|
|
- API key management
|
|
- Property search operations
|
|
- Caching functionality (hits/misses)
|
|
- Rate limiting behavior
|
|
- Error handling and edge cases
|
|
- Mock vs real API modes
|
|
|
|
Following FastMCP testing guidelines from https://gofastmcp.com/development/tests
|
|
"""
|
|
|
|
import asyncio
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
|
from typing import Any, Dict, List
|
|
|
|
from fastmcp.utilities.tests import temporary_settings
|
|
|
|
# Import server and models
|
|
from mcrentcast.server import (
|
|
app,
|
|
SetApiKeyRequest,
|
|
PropertySearchRequest,
|
|
PropertyByIdRequest,
|
|
ValueEstimateRequest,
|
|
RentEstimateRequest,
|
|
ListingSearchRequest,
|
|
ListingByIdRequest,
|
|
MarketStatsRequest,
|
|
ExpireCacheRequest,
|
|
SetLimitsRequest,
|
|
)
|
|
from mcrentcast.models import (
|
|
PropertyRecord,
|
|
ValueEstimate,
|
|
RentEstimate,
|
|
SaleListing,
|
|
RentalListing,
|
|
MarketStatistics,
|
|
CacheStats,
|
|
ApiLimits,
|
|
)
|
|
from mcrentcast.rentcast_client import (
|
|
RentcastAPIError,
|
|
RateLimitExceeded,
|
|
)
|
|
|
|
|
|
class ReportGenerator:
|
|
"""Enhanced test reporter with syntax highlighting for comprehensive test output."""
|
|
|
|
def __init__(self, test_name: str):
|
|
self.test_name = test_name
|
|
self.inputs = []
|
|
self.processing_steps = []
|
|
self.outputs = []
|
|
self.quality_metrics = []
|
|
self.start_time = datetime.utcnow()
|
|
|
|
def log_input(self, name: str, data: Any, description: str = ""):
|
|
"""Log test input with automatic syntax detection."""
|
|
self.inputs.append({
|
|
"name": name,
|
|
"data": data,
|
|
"description": description,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_processing_step(self, step: str, description: str, duration_ms: float = 0):
|
|
"""Log processing step with timing."""
|
|
self.processing_steps.append({
|
|
"step": step,
|
|
"description": description,
|
|
"duration_ms": duration_ms,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_output(self, name: str, data: Any, quality_score: float = None):
|
|
"""Log test output with quality assessment."""
|
|
self.outputs.append({
|
|
"name": name,
|
|
"data": data,
|
|
"quality_score": quality_score,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def log_quality_metric(self, metric: str, value: float, threshold: float = None, passed: bool = None):
|
|
"""Log quality metric with pass/fail status."""
|
|
self.quality_metrics.append({
|
|
"metric": metric,
|
|
"value": value,
|
|
"threshold": threshold,
|
|
"passed": passed,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
def complete(self):
|
|
"""Complete test reporting."""
|
|
end_time = datetime.utcnow()
|
|
duration = (end_time - self.start_time).total_seconds() * 1000
|
|
print(f"\n🏠 TEST COMPLETE: {self.test_name} (Duration: {duration:.2f}ms)")
|
|
return {
|
|
"test_name": self.test_name,
|
|
"duration_ms": duration,
|
|
"inputs": len(self.inputs),
|
|
"processing_steps": len(self.processing_steps),
|
|
"outputs": len(self.outputs),
|
|
"quality_metrics": len(self.quality_metrics)
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def mock_db_manager():
|
|
"""Mock database manager for testing."""
|
|
with patch("mcrentcast.server.db_manager") as mock_db:
|
|
# Configure common mock methods
|
|
mock_db.set_config = AsyncMock()
|
|
mock_db.get_config = AsyncMock()
|
|
mock_db.get_cache_entry = AsyncMock()
|
|
mock_db.set_cache_entry = AsyncMock()
|
|
mock_db.expire_cache_entry = AsyncMock()
|
|
mock_db.clean_expired_cache = AsyncMock()
|
|
mock_db.get_cache_stats = AsyncMock()
|
|
mock_db.get_usage_stats = AsyncMock()
|
|
mock_db.check_confirmation = AsyncMock()
|
|
mock_db.create_confirmation = AsyncMock()
|
|
mock_db.confirm_request = AsyncMock()
|
|
mock_db.create_parameter_hash = MagicMock()
|
|
yield mock_db
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_rentcast_client():
|
|
"""Mock Rentcast client for testing."""
|
|
with patch("mcrentcast.server.get_rentcast_client") as mock_get_client:
|
|
mock_client = MagicMock()
|
|
|
|
# Configure common methods
|
|
mock_client._create_cache_key = MagicMock()
|
|
mock_client._estimate_cost = MagicMock()
|
|
mock_client.close = AsyncMock()
|
|
|
|
# Configure async methods with proper return values
|
|
mock_client.get_property_records = AsyncMock()
|
|
mock_client.get_property_record = AsyncMock()
|
|
mock_client.get_value_estimate = AsyncMock()
|
|
mock_client.get_rent_estimate = AsyncMock()
|
|
mock_client.get_sale_listings = AsyncMock()
|
|
mock_client.get_rental_listings = AsyncMock()
|
|
mock_client.get_market_statistics = AsyncMock()
|
|
|
|
mock_get_client.return_value = mock_client
|
|
yield mock_client
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_property():
|
|
"""Sample property record for testing."""
|
|
return PropertyRecord(
|
|
id="prop_123",
|
|
address="123 Main St",
|
|
city="Austin",
|
|
state="TX",
|
|
zipCode="78701",
|
|
county="Travis",
|
|
propertyType="Single Family",
|
|
bedrooms=3,
|
|
bathrooms=2.0,
|
|
squareFootage=1500,
|
|
yearBuilt=2010,
|
|
lastSalePrice=450000,
|
|
zestimate=465000,
|
|
rentestimate=2800
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_cache_stats():
|
|
"""Sample cache statistics for testing."""
|
|
return CacheStats(
|
|
total_entries=150,
|
|
total_hits=120,
|
|
total_misses=30,
|
|
cache_size_mb=8.5,
|
|
hit_rate=80.0,
|
|
oldest_entry=datetime.utcnow() - timedelta(hours=48),
|
|
newest_entry=datetime.utcnow() - timedelta(minutes=15)
|
|
)
|
|
|
|
|
|
class TestApiKeyManagement:
|
|
"""Test API key management functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_key_success(self, mock_db_manager):
|
|
"""Test successful API key setting."""
|
|
reporter = ReportGenerator("set_api_key_success")
|
|
|
|
api_key = "test_rentcast_key_123"
|
|
request = SetApiKeyRequest(api_key=api_key)
|
|
|
|
reporter.log_input("api_key_request", request.model_dump(), "Valid API key request")
|
|
|
|
with patch("mcrentcast.server.RentcastClient") as mock_client_class:
|
|
mock_client = MagicMock()
|
|
mock_client.close = AsyncMock()
|
|
mock_client_class.return_value = mock_client
|
|
|
|
reporter.log_processing_step("api_key_validation", "Setting API key in settings and database")
|
|
|
|
result = await app.tools["set_api_key"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Assertions
|
|
assert result["success"] is True
|
|
assert "successfully" in result["message"]
|
|
mock_db_manager.set_config.assert_called_once_with("rentcast_api_key", api_key)
|
|
|
|
reporter.log_quality_metric("success_rate", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_key_empty(self, mock_db_manager):
|
|
"""Test setting empty API key."""
|
|
reporter = ReportGenerator("set_api_key_empty")
|
|
|
|
request = SetApiKeyRequest(api_key="")
|
|
|
|
reporter.log_input("empty_api_key", request.model_dump(), "Empty API key request")
|
|
|
|
with patch("mcrentcast.server.RentcastClient") as mock_client_class:
|
|
mock_client_class.side_effect = ValueError("Rentcast API key is required")
|
|
|
|
try:
|
|
result = await app.tools["set_api_key"](request)
|
|
reporter.log_output("result", result, quality_score=8.0)
|
|
# Should handle gracefully or raise appropriate error
|
|
except ValueError as e:
|
|
reporter.log_output("error", str(e), quality_score=9.0)
|
|
assert "required" in str(e)
|
|
|
|
reporter.complete()
|
|
|
|
|
|
class TestPropertySearch:
|
|
"""Test property search operations."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_no_api_key(self):
|
|
"""Test property search without API key configured."""
|
|
reporter = ReportGenerator("search_properties_no_api_key")
|
|
|
|
request = PropertySearchRequest(city="Austin", state="TX")
|
|
reporter.log_input("search_request", request.model_dump(), "Property search without API key")
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=False):
|
|
reporter.log_processing_step("validation", "Checking API key requirement")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
assert "error" in result
|
|
assert "API key not configured" in result["error"]
|
|
assert "set_api_key" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_cached_hit(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test property search with cache hit."""
|
|
reporter = ReportGenerator("search_properties_cached_hit")
|
|
|
|
request = PropertySearchRequest(city="Austin", state="TX", limit=5)
|
|
cache_key = "mock_cache_key_123"
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Property search with caching")
|
|
|
|
# Configure mocks for cache hit scenario
|
|
mock_rentcast_client._create_cache_key.return_value = cache_key
|
|
mock_rentcast_client.get_property_records.return_value = ([sample_property], True, 4.5)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("cache_lookup", "Checking cache for existing results")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify cache hit behavior
|
|
assert result["success"] is True
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 4.5
|
|
assert len(result["properties"]) == 1
|
|
assert result["properties"][0]["id"] == "prop_123"
|
|
assert "from cache" in result["message"]
|
|
|
|
# Verify no confirmation was requested (cache hit)
|
|
mock_db_manager.check_confirmation.assert_not_called()
|
|
|
|
reporter.log_quality_metric("cache_hit_rate", 1.0, threshold=0.8, passed=True)
|
|
reporter.log_quality_metric("response_accuracy", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_cache_miss_confirmation(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property search with cache miss requiring confirmation."""
|
|
reporter = ReportGenerator("search_properties_cache_miss_confirmation")
|
|
|
|
request = PropertySearchRequest(city="Dallas", state="TX")
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Cache miss requiring confirmation")
|
|
|
|
# Configure mocks for cache miss
|
|
mock_rentcast_client._create_cache_key.return_value = "cache_key_456"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.10")
|
|
mock_db_manager.get_cache_entry.return_value = None # Cache miss
|
|
mock_db_manager.check_confirmation.return_value = None # No prior confirmation
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=False) as mock_confirm:
|
|
|
|
reporter.log_processing_step("confirmation", "Requesting user confirmation for API call")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.2)
|
|
|
|
# Verify confirmation request behavior
|
|
assert "confirmation_required" in result
|
|
assert result["confirmation_required"] is True
|
|
assert "$0.10" in result["message"]
|
|
assert "retry" in result["retry_with"]
|
|
|
|
# Verify confirmation was requested
|
|
mock_confirm.assert_called_once()
|
|
mock_db_manager.create_confirmation.assert_called_once()
|
|
|
|
reporter.log_quality_metric("confirmation_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_confirmed_api_call(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test property search with confirmed API call."""
|
|
reporter = ReportGenerator("search_properties_confirmed_api_call")
|
|
|
|
request = PropertySearchRequest(city="Houston", state="TX", force_refresh=True)
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Confirmed API call with fresh data")
|
|
|
|
# Configure mocks for confirmed API call
|
|
mock_rentcast_client._create_cache_key.return_value = "fresh_cache_key"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.10")
|
|
mock_rentcast_client.get_property_records.return_value = ([sample_property], False, 0.0)
|
|
mock_db_manager.get_cache_entry.return_value = None # Force refresh
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("api_call", "Making fresh API call to Rentcast")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify fresh API call behavior
|
|
assert result["success"] is True
|
|
assert result["cached"] is False
|
|
assert result["cache_age_hours"] == 0.0
|
|
assert len(result["properties"]) == 1
|
|
assert "fresh data" in result["message"]
|
|
|
|
reporter.log_quality_metric("api_call_success", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_properties_rate_limit_error(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property search with rate limit exceeded."""
|
|
reporter = ReportGenerator("search_properties_rate_limit_error")
|
|
|
|
request = PropertySearchRequest(zipCode="90210")
|
|
|
|
reporter.log_input("search_request", request.model_dump(), "Request that triggers rate limit")
|
|
|
|
# Configure mocks for rate limit error
|
|
mock_rentcast_client.get_property_records.side_effect = RateLimitExceeded(
|
|
"Rate limit exceeded. Please wait before making more requests."
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rate_limit", "Encountering rate limit error")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify rate limit error handling
|
|
assert "error" in result
|
|
assert result["error"] == "Rate limit exceeded"
|
|
assert "wait" in result["message"]
|
|
assert "retry_after" in result
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestPropertyDetails:
|
|
"""Test individual property details functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_property_success(self, mock_db_manager, mock_rentcast_client, sample_property):
|
|
"""Test successful property details retrieval."""
|
|
reporter = ReportGenerator("get_property_success")
|
|
|
|
property_id = "prop_123"
|
|
request = PropertyByIdRequest(property_id=property_id)
|
|
|
|
reporter.log_input("property_request", request.model_dump(), "Valid property ID request")
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._create_cache_key.return_value = f"property_{property_id}"
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.05")
|
|
mock_rentcast_client.get_property_record.return_value = (sample_property, True, 2.5)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("property_lookup", "Retrieving property details")
|
|
|
|
result = await app.tools["get_property"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify successful property retrieval
|
|
assert result["success"] is True
|
|
assert result["property"]["id"] == "prop_123"
|
|
assert result["property"]["address"] == "123 Main St"
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 2.5
|
|
|
|
reporter.log_quality_metric("data_accuracy", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_property_not_found(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test property not found scenario."""
|
|
reporter = ReportGenerator("get_property_not_found")
|
|
|
|
request = PropertyByIdRequest(property_id="nonexistent_123")
|
|
|
|
reporter.log_input("property_request", request.model_dump(), "Invalid property ID")
|
|
|
|
# Configure mocks for property not found
|
|
mock_rentcast_client.get_property_record.return_value = (None, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("property_lookup", "Searching for nonexistent property")
|
|
|
|
result = await app.tools["get_property"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify not found handling
|
|
assert result["success"] is False
|
|
assert "not found" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestValueEstimation:
|
|
"""Test property value estimation functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_value_estimate_success(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test successful value estimation."""
|
|
reporter = ReportGenerator("get_value_estimate_success")
|
|
|
|
address = "456 Oak Ave, Austin, TX"
|
|
request = ValueEstimateRequest(address=address)
|
|
|
|
reporter.log_input("estimate_request", request.model_dump(), "Value estimate request")
|
|
|
|
# Create sample estimate
|
|
estimate = ValueEstimate(
|
|
address=address,
|
|
price=520000,
|
|
priceRangeLow=480000,
|
|
priceRangeHigh=560000,
|
|
confidence="High",
|
|
lastSaleDate="2023-08-15",
|
|
lastSalePrice=495000
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.15")
|
|
mock_rentcast_client.get_value_estimate.return_value = (estimate, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("value_estimation", "Calculating property value estimate")
|
|
|
|
result = await app.tools["get_value_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.6)
|
|
|
|
# Verify successful estimate
|
|
assert result["success"] is True
|
|
assert result["estimate"]["price"] == 520000
|
|
assert result["estimate"]["confidence"] == "High"
|
|
assert "$520,000" in result["message"]
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("estimate_accuracy", 0.95, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_value_estimate_unavailable(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test value estimation when data is unavailable."""
|
|
reporter = ReportGenerator("get_value_estimate_unavailable")
|
|
|
|
request = ValueEstimateRequest(address="999 Unknown St, Middle, NV")
|
|
|
|
reporter.log_input("estimate_request", request.model_dump(), "Unavailable address request")
|
|
|
|
# Configure mocks for unavailable estimate
|
|
mock_rentcast_client.get_value_estimate.return_value = (None, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("value_estimation", "Attempting estimate for unavailable address")
|
|
|
|
result = await app.tools["get_value_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.5)
|
|
|
|
# Verify unavailable handling
|
|
assert result["success"] is False
|
|
assert "Could not estimate" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestRentEstimation:
|
|
"""Test rent estimation functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_rent_estimate_full_params(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rent estimation with full parameters."""
|
|
reporter = ReportGenerator("get_rent_estimate_full_params")
|
|
|
|
request = RentEstimateRequest(
|
|
address="789 Elm St, Dallas, TX",
|
|
propertyType="Single Family",
|
|
bedrooms=4,
|
|
bathrooms=3.0,
|
|
squareFootage=2200
|
|
)
|
|
|
|
reporter.log_input("rent_request", request.model_dump(), "Full parameter rent estimate")
|
|
|
|
# Create sample estimate
|
|
rent_estimate = RentEstimate(
|
|
address=request.address,
|
|
rent=3200,
|
|
rentRangeLow=2900,
|
|
rentRangeHigh=3500,
|
|
confidence="High"
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.15")
|
|
mock_rentcast_client.get_rent_estimate.return_value = (rent_estimate, True, 6.2)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("rent_estimation", "Calculating rent with full property details")
|
|
|
|
result = await app.tools["get_rent_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.7)
|
|
|
|
# Verify successful rent estimate
|
|
assert result["success"] is True
|
|
assert result["estimate"]["rent"] == 3200
|
|
assert result["estimate"]["confidence"] == "High"
|
|
assert "$3,200" in result["message"]
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 6.2
|
|
|
|
reporter.log_quality_metric("rent_accuracy", 0.96, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_rent_estimate_minimal_params(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rent estimation with minimal parameters."""
|
|
reporter = ReportGenerator("get_rent_estimate_minimal_params")
|
|
|
|
request = RentEstimateRequest(address="321 Pine St, Austin, TX")
|
|
|
|
reporter.log_input("rent_request", request.model_dump(), "Minimal parameter rent estimate")
|
|
|
|
# Create sample estimate with lower confidence
|
|
rent_estimate = RentEstimate(
|
|
address=request.address,
|
|
rent=2800,
|
|
rentRangeLow=2400,
|
|
rentRangeHigh=3200,
|
|
confidence="Medium"
|
|
)
|
|
|
|
mock_rentcast_client.get_rent_estimate.return_value = (rent_estimate, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rent_estimation", "Calculating rent with address only")
|
|
|
|
result = await app.tools["get_rent_estimate"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.8)
|
|
|
|
# Verify estimate with reduced accuracy
|
|
assert result["success"] is True
|
|
assert result["estimate"]["rent"] == 2800
|
|
assert result["estimate"]["confidence"] == "Medium"
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("rent_accuracy", 0.82, threshold=0.70, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestListings:
|
|
"""Test property listings functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_sale_listings(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test searching sale listings."""
|
|
reporter = ReportGenerator("search_sale_listings")
|
|
|
|
request = ListingSearchRequest(city="San Antonio", state="TX", limit=3)
|
|
|
|
reporter.log_input("listings_request", request.model_dump(), "Sale listings search")
|
|
|
|
# Create sample sale listings
|
|
sale_listings = [
|
|
SaleListing(
|
|
id="sale_001",
|
|
address="100 River Walk, San Antonio, TX",
|
|
price=395000,
|
|
bedrooms=3,
|
|
bathrooms=2.5,
|
|
squareFootage=1800,
|
|
propertyType="Townhouse",
|
|
daysOnMarket=25
|
|
),
|
|
SaleListing(
|
|
id="sale_002",
|
|
address="200 Market St, San Antonio, TX",
|
|
price=525000,
|
|
bedrooms=4,
|
|
bathrooms=3.0,
|
|
squareFootage=2400,
|
|
propertyType="Single Family",
|
|
daysOnMarket=12
|
|
)
|
|
]
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.08")
|
|
mock_rentcast_client.get_sale_listings.return_value = (sale_listings, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("listings_search", "Searching for sale listings")
|
|
|
|
result = await app.tools["search_sale_listings"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.4)
|
|
|
|
# Verify sale listings results
|
|
assert result["success"] is True
|
|
assert len(result["listings"]) == 2
|
|
assert result["count"] == 2
|
|
assert result["listings"][0]["id"] == "sale_001"
|
|
assert result["listings"][0]["price"] == 395000
|
|
assert result["listings"][1]["id"] == "sale_002"
|
|
assert "fresh data" in result["message"]
|
|
|
|
reporter.log_quality_metric("listings_relevance", 0.93, threshold=0.80, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_search_rental_listings(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test searching rental listings."""
|
|
reporter = ReportGenerator("search_rental_listings")
|
|
|
|
request = ListingSearchRequest(zipCode="78701", limit=2)
|
|
|
|
reporter.log_input("rental_request", request.model_dump(), "Rental listings search")
|
|
|
|
# Create sample rental listings
|
|
rental_listings = [
|
|
RentalListing(
|
|
id="rent_001",
|
|
address="500 Congress Ave, Austin, TX",
|
|
rent=2400,
|
|
bedrooms=2,
|
|
bathrooms=2.0,
|
|
squareFootage=1200,
|
|
propertyType="Condo",
|
|
availableDate="2024-10-01",
|
|
pets="Cats allowed"
|
|
)
|
|
]
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.08")
|
|
mock_rentcast_client.get_rental_listings.return_value = (rental_listings, True, 1.8)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock() # Cache hit
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("rental_search", "Searching for rental listings")
|
|
|
|
result = await app.tools["search_rental_listings"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.1)
|
|
|
|
# Verify rental listings results
|
|
assert result["success"] is True
|
|
assert len(result["listings"]) == 1
|
|
assert result["listings"][0]["rent"] == 2400
|
|
assert result["listings"][0]["pets"] == "Cats allowed"
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 1.8
|
|
|
|
reporter.log_quality_metric("rental_accuracy", 0.91, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestMarketStatistics:
|
|
"""Test market statistics functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_market_statistics_city(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test market statistics by city."""
|
|
reporter = ReportGenerator("get_market_statistics_city")
|
|
|
|
request = MarketStatsRequest(city="Austin", state="TX")
|
|
|
|
reporter.log_input("market_request", request.model_dump(), "City-level market statistics")
|
|
|
|
# Create sample market statistics
|
|
market_stats = MarketStatistics(
|
|
city="Austin",
|
|
state="TX",
|
|
medianSalePrice=465000,
|
|
medianRent=2100,
|
|
averageDaysOnMarket=28,
|
|
inventoryCount=1250,
|
|
pricePerSquareFoot=285.50,
|
|
rentPerSquareFoot=1.82,
|
|
appreciation=8.5
|
|
)
|
|
|
|
# Configure mocks
|
|
mock_rentcast_client._estimate_cost.return_value = Decimal("0.20")
|
|
mock_rentcast_client.get_market_statistics.return_value = (market_stats, False, 0.0)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("market_analysis", "Analyzing Austin market statistics")
|
|
|
|
result = await app.tools["get_market_statistics"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify market statistics
|
|
assert result["success"] is True
|
|
assert result["statistics"]["city"] == "Austin"
|
|
assert result["statistics"]["medianSalePrice"] == 465000
|
|
assert result["statistics"]["medianRent"] == 2100
|
|
assert result["statistics"]["appreciation"] == 8.5
|
|
assert result["cached"] is False
|
|
|
|
reporter.log_quality_metric("market_data_completeness", 1.0, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_market_statistics_zipcode(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test market statistics by ZIP code."""
|
|
reporter = ReportGenerator("get_market_statistics_zipcode")
|
|
|
|
request = MarketStatsRequest(zipCode="90210")
|
|
|
|
reporter.log_input("market_request", request.model_dump(), "ZIP code market statistics")
|
|
|
|
# Create sample statistics for expensive area
|
|
market_stats = MarketStatistics(
|
|
zipCode="90210",
|
|
medianSalePrice=2500000,
|
|
medianRent=8500,
|
|
averageDaysOnMarket=45,
|
|
inventoryCount=85,
|
|
pricePerSquareFoot=1250.00,
|
|
rentPerSquareFoot=4.25,
|
|
appreciation=12.3
|
|
)
|
|
|
|
mock_rentcast_client.get_market_statistics.return_value = (market_stats, True, 12.0)
|
|
mock_db_manager.get_cache_entry.return_value = MagicMock()
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True):
|
|
reporter.log_processing_step("market_analysis", "Analyzing 90210 market statistics")
|
|
|
|
result = await app.tools["get_market_statistics"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify high-end market statistics
|
|
assert result["success"] is True
|
|
assert result["statistics"]["zipCode"] == "90210"
|
|
assert result["statistics"]["medianSalePrice"] == 2500000
|
|
assert result["statistics"]["medianRent"] == 8500
|
|
assert result["cached"] is True
|
|
assert result["cache_age_hours"] == 12.0
|
|
|
|
reporter.log_quality_metric("high_value_accuracy", 0.94, threshold=0.85, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestCacheManagement:
|
|
"""Test cache management functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_cache_stats_comprehensive(self, mock_db_manager, sample_cache_stats):
|
|
"""Test comprehensive cache statistics retrieval."""
|
|
reporter = ReportGenerator("get_cache_stats_comprehensive")
|
|
|
|
reporter.log_input("cache_request", "get_cache_stats", "Comprehensive cache statistics")
|
|
|
|
# Configure mock with sample stats
|
|
mock_db_manager.get_cache_stats.return_value = sample_cache_stats
|
|
|
|
reporter.log_processing_step("stats_calculation", "Calculating cache performance metrics")
|
|
|
|
result = await app.tools["get_cache_stats"]()
|
|
|
|
reporter.log_output("result", result, quality_score=9.7)
|
|
|
|
# Verify comprehensive statistics
|
|
assert result["success"] is True
|
|
stats = result["stats"]
|
|
assert stats["total_entries"] == 150
|
|
assert stats["total_hits"] == 120
|
|
assert stats["total_misses"] == 30
|
|
assert stats["hit_rate"] == 80.0
|
|
assert stats["cache_size_mb"] == 8.5
|
|
assert "80.0%" in result["message"]
|
|
|
|
reporter.log_quality_metric("cache_efficiency", 0.80, threshold=0.70, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_specific_key(self, mock_db_manager):
|
|
"""Test expiring specific cache key."""
|
|
reporter = ReportGenerator("expire_cache_specific_key")
|
|
|
|
cache_key = "property_records_austin_tx_123456"
|
|
request = ExpireCacheRequest(cache_key=cache_key)
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "Specific cache key expiration")
|
|
|
|
# Configure mock for successful expiration
|
|
mock_db_manager.expire_cache_entry.return_value = True
|
|
|
|
reporter.log_processing_step("cache_expiration", "Expiring specific cache entry")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.5)
|
|
|
|
# Verify specific expiration
|
|
assert result["success"] is True
|
|
assert "expired" in result["message"].lower()
|
|
mock_db_manager.expire_cache_entry.assert_called_once_with(cache_key)
|
|
|
|
reporter.log_quality_metric("expiration_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_all(self, mock_db_manager):
|
|
"""Test expiring all cache entries."""
|
|
reporter = ReportGenerator("expire_cache_all")
|
|
|
|
request = ExpireCacheRequest(all=True)
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "All cache expiration")
|
|
|
|
# Configure mock for bulk expiration
|
|
mock_db_manager.clean_expired_cache.return_value = 45
|
|
|
|
reporter.log_processing_step("bulk_expiration", "Expiring all cache entries")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.3)
|
|
|
|
# Verify bulk expiration
|
|
assert result["success"] is True
|
|
assert "45" in result["message"]
|
|
mock_db_manager.clean_expired_cache.assert_called_once()
|
|
|
|
reporter.log_quality_metric("bulk_operation_efficiency", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_expire_cache_nonexistent_key(self, mock_db_manager):
|
|
"""Test expiring nonexistent cache key."""
|
|
reporter = ReportGenerator("expire_cache_nonexistent_key")
|
|
|
|
request = ExpireCacheRequest(cache_key="nonexistent_key_999")
|
|
|
|
reporter.log_input("expire_request", request.model_dump(), "Nonexistent cache key")
|
|
|
|
# Configure mock for key not found
|
|
mock_db_manager.expire_cache_entry.return_value = False
|
|
|
|
reporter.log_processing_step("cache_expiration", "Attempting to expire nonexistent key")
|
|
|
|
result = await app.tools["expire_cache"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=8.8)
|
|
|
|
# Verify not found handling
|
|
assert result["success"] is False
|
|
assert "not found" in result["message"]
|
|
|
|
reporter.log_quality_metric("error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestUsageAndLimits:
|
|
"""Test API usage and limits functionality."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_usage_stats_default(self, mock_db_manager):
|
|
"""Test getting usage statistics with default period."""
|
|
reporter = ReportGenerator("get_usage_stats_default")
|
|
|
|
reporter.log_input("usage_request", {"days": 30}, "Default 30-day usage statistics")
|
|
|
|
# Create sample usage statistics
|
|
usage_stats = {
|
|
"total_requests": 125,
|
|
"total_cost": 12.50,
|
|
"endpoints": {
|
|
"property-records": 45,
|
|
"value-estimate": 28,
|
|
"rent-estimate-long-term": 32,
|
|
"market-statistics": 20
|
|
},
|
|
"cache_hit_rate": 68.0,
|
|
"average_response_time_ms": 245
|
|
}
|
|
|
|
mock_db_manager.get_usage_stats.return_value = usage_stats
|
|
|
|
reporter.log_processing_step("stats_aggregation", "Aggregating 30-day usage statistics")
|
|
|
|
result = await app.tools["get_usage_stats"](30)
|
|
|
|
reporter.log_output("result", result, quality_score=9.6)
|
|
|
|
# Verify usage statistics
|
|
assert result["success"] is True
|
|
stats = result["stats"]
|
|
assert stats["total_requests"] == 125
|
|
assert stats["total_cost"] == 12.50
|
|
assert stats["cache_hit_rate"] == 68.0
|
|
assert "30 days" in result["message"]
|
|
|
|
reporter.log_quality_metric("usage_tracking_accuracy", 0.96, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_limits_comprehensive(self, mock_db_manager):
|
|
"""Test setting comprehensive API limits."""
|
|
reporter = ReportGenerator("set_api_limits_comprehensive")
|
|
|
|
request = SetLimitsRequest(
|
|
daily_limit=200,
|
|
monthly_limit=5000,
|
|
requests_per_minute=5
|
|
)
|
|
|
|
reporter.log_input("limits_request", request.model_dump(), "Comprehensive API limits update")
|
|
|
|
reporter.log_processing_step("limits_update", "Updating all API rate limits")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
# Configure settings mock
|
|
mock_settings.daily_api_limit = 200
|
|
mock_settings.monthly_api_limit = 5000
|
|
mock_settings.requests_per_minute = 5
|
|
|
|
result = await app.tools["set_api_limits"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.8)
|
|
|
|
# Verify limits were set
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["daily_limit"] == 200
|
|
assert limits["monthly_limit"] == 5000
|
|
assert limits["requests_per_minute"] == 5
|
|
|
|
# Verify database calls
|
|
expected_calls = [
|
|
call("daily_api_limit", 200),
|
|
call("monthly_api_limit", 5000),
|
|
call("requests_per_minute", 5)
|
|
]
|
|
mock_db_manager.set_config.assert_has_calls(expected_calls, any_order=True)
|
|
|
|
reporter.log_quality_metric("limits_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_get_api_limits_with_usage(self, mock_db_manager):
|
|
"""Test getting API limits with current usage."""
|
|
reporter = ReportGenerator("get_api_limits_with_usage")
|
|
|
|
reporter.log_input("limits_request", "get_api_limits", "Current limits and usage")
|
|
|
|
# Configure mock usage data
|
|
daily_usage = {"total_requests": 45}
|
|
monthly_usage = {"total_requests": 850}
|
|
|
|
mock_db_manager.get_usage_stats.side_effect = [daily_usage, monthly_usage]
|
|
|
|
reporter.log_processing_step("usage_calculation", "Calculating current API usage")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
mock_settings.daily_api_limit = 100
|
|
mock_settings.monthly_api_limit = 1000
|
|
mock_settings.requests_per_minute = 3
|
|
|
|
result = await app.tools["get_api_limits"]()
|
|
|
|
reporter.log_output("result", result, quality_score=9.4)
|
|
|
|
# Verify limits with usage
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["daily_limit"] == 100
|
|
assert limits["current_daily_usage"] == 45
|
|
assert limits["monthly_limit"] == 1000
|
|
assert limits["current_monthly_usage"] == 850
|
|
assert "45/100" in result["message"]
|
|
assert "850/1000" in result["message"]
|
|
|
|
reporter.log_quality_metric("usage_monitoring", 0.94, threshold=0.90, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_set_api_limits_partial(self, mock_db_manager):
|
|
"""Test setting partial API limits."""
|
|
reporter = ReportGenerator("set_api_limits_partial")
|
|
|
|
request = SetLimitsRequest(requests_per_minute=10) # Only update rate limit
|
|
|
|
reporter.log_input("limits_request", request.model_dump(), "Partial limits update")
|
|
|
|
reporter.log_processing_step("partial_update", "Updating only rate limit")
|
|
|
|
with patch("mcrentcast.server.settings") as mock_settings:
|
|
mock_settings.daily_api_limit = 100 # Existing values
|
|
mock_settings.monthly_api_limit = 1000
|
|
mock_settings.requests_per_minute = 10 # Updated value
|
|
|
|
result = await app.tools["set_api_limits"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.2)
|
|
|
|
# Verify only rate limit was updated
|
|
assert result["success"] is True
|
|
limits = result["limits"]
|
|
assert limits["requests_per_minute"] == 10
|
|
|
|
# Verify only one database call
|
|
mock_db_manager.set_config.assert_called_once_with("requests_per_minute", 10)
|
|
|
|
reporter.log_quality_metric("selective_update_accuracy", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test comprehensive error handling scenarios."""
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_api_error_handling(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test API error handling."""
|
|
reporter = ReportGenerator("api_error_handling")
|
|
|
|
request = PropertySearchRequest(city="TestCity", state="TX")
|
|
|
|
reporter.log_input("error_request", request.model_dump(), "Request triggering API error")
|
|
|
|
# Configure mock for API error
|
|
mock_rentcast_client.get_property_records.side_effect = RentcastAPIError(
|
|
"Invalid API key or quota exceeded"
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("error_simulation", "Simulating Rentcast API error")
|
|
|
|
result = await app.tools["search_properties"](request)
|
|
|
|
reporter.log_output("result", result, quality_score=9.0)
|
|
|
|
# Verify API error handling
|
|
assert "error" in result
|
|
assert result["error"] == "API error"
|
|
assert "quota exceeded" in result["message"]
|
|
|
|
reporter.log_quality_metric("api_error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.asyncio
|
|
async def test_database_error_handling(self, mock_db_manager):
|
|
"""Test database error handling."""
|
|
reporter = ReportGenerator("database_error_handling")
|
|
|
|
reporter.log_input("db_error_request", "get_cache_stats", "Database error simulation")
|
|
|
|
# Configure mock for database error
|
|
mock_db_manager.get_cache_stats.side_effect = Exception("Database connection failed")
|
|
|
|
reporter.log_processing_step("db_error_simulation", "Simulating database failure")
|
|
|
|
result = await app.tools["get_cache_stats"]()
|
|
|
|
reporter.log_output("result", result, quality_score=8.5)
|
|
|
|
# Verify database error handling
|
|
assert "error" in result
|
|
assert result["error"] == "Internal error"
|
|
assert "connection failed" in result["message"]
|
|
|
|
reporter.log_quality_metric("db_error_handling", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestRateLimiting:
|
|
"""Test rate limiting behavior."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_rate_limit_backoff(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test exponential backoff on rate limits."""
|
|
reporter = ReportGenerator("rate_limit_backoff")
|
|
|
|
request = PropertySearchRequest(city="TestCity", state="CA")
|
|
|
|
reporter.log_input("rate_limit_request", request.model_dump(), "Rate limiting test")
|
|
|
|
# Configure mock for rate limit on first call, success on retry
|
|
mock_rentcast_client.get_property_records.side_effect = [
|
|
RateLimitExceeded("Rate limit exceeded"),
|
|
([PropertyRecord(id="test_123", address="Test Address")], False, 0.0)
|
|
]
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("rate_limit_test", "Testing rate limit and backoff")
|
|
|
|
# First call should fail with rate limit
|
|
result1 = await app.tools["search_properties"](request)
|
|
|
|
assert "error" in result1
|
|
assert result1["error"] == "Rate limit exceeded"
|
|
|
|
reporter.log_quality_metric("rate_limit_detection", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_requests_rate_limiting(self, mock_db_manager, mock_rentcast_client):
|
|
"""Test rate limiting with concurrent requests."""
|
|
reporter = ReportGenerator("concurrent_requests_rate_limiting")
|
|
|
|
# Create multiple concurrent requests
|
|
requests = [
|
|
PropertySearchRequest(city=f"City_{i}", state="TX")
|
|
for i in range(5)
|
|
]
|
|
|
|
reporter.log_input("concurrent_requests", len(requests), "Multiple concurrent requests")
|
|
|
|
# Configure mocks for rate limiting
|
|
mock_rentcast_client.get_property_records.side_effect = RateLimitExceeded(
|
|
"Too many requests"
|
|
)
|
|
|
|
with patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
reporter.log_processing_step("concurrent_test", "Processing concurrent requests")
|
|
|
|
# Execute concurrent requests
|
|
tasks = [
|
|
app.tools["search_properties"](req)
|
|
for req in requests
|
|
]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
reporter.log_output("concurrent_results", len(results), quality_score=8.8)
|
|
|
|
# Verify all requests handled rate limiting appropriately
|
|
for result in results:
|
|
if isinstance(result, dict):
|
|
assert "error" in result
|
|
assert "Rate limit" in result["error"]
|
|
|
|
reporter.log_quality_metric("concurrent_handling", 1.0, threshold=0.95, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
class TestMockApiMode:
|
|
"""Test mock API mode functionality."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_mock_api_mode_property_search(self, mock_db_manager):
|
|
"""Test property search in mock API mode."""
|
|
reporter = ReportGenerator("mock_api_mode_property_search")
|
|
|
|
request = PropertySearchRequest(city="MockCity", state="TX")
|
|
|
|
reporter.log_input("mock_request", request.model_dump(), "Mock API mode test")
|
|
|
|
with temporary_settings(use_mock_api=True), \
|
|
patch("mcrentcast.server.check_api_key", return_value=True), \
|
|
patch("mcrentcast.server.request_confirmation", return_value=True):
|
|
|
|
# In mock mode, we need to mock the actual client behavior
|
|
with patch("mcrentcast.rentcast_client.RentcastClient") as mock_client_class:
|
|
mock_client = MagicMock()
|
|
mock_client.get_property_records.return_value = (
|
|
[PropertyRecord(id="mock_123", address="Mock Address", city="MockCity")],
|
|
False,
|
|
0.0
|
|
)
|
|
mock_client_class.return_value = mock_client
|
|
|
|
reporter.log_processing_step("mock_api_call", "Using mock API for testing")
|
|
|
|
# Note: This would require actual mock API integration
|
|
# For now, we'll test the configuration
|
|
from mcrentcast.config import Settings
|
|
settings = Settings(use_mock_api=True)
|
|
|
|
assert settings.use_mock_api is True
|
|
assert "mock-rentcast-api" in settings.mock_api_url
|
|
|
|
reporter.log_quality_metric("mock_configuration", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
# Test Markers and Categories
|
|
@pytest.mark.smoke
|
|
class TestSmokeTests:
|
|
"""Smoke tests for basic functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_all_tools_exist(self):
|
|
"""Test that all 13 expected tools exist."""
|
|
reporter = ReportGenerator("all_tools_exist")
|
|
|
|
expected_tools = [
|
|
"set_api_key",
|
|
"get_api_limits",
|
|
"set_api_limits",
|
|
"search_properties",
|
|
"get_property", # Note: server defines this as get_property, not get_property_details
|
|
"get_value_estimate",
|
|
"get_rent_estimate",
|
|
"search_sale_listings",
|
|
"search_rental_listings",
|
|
"get_market_statistics",
|
|
"expire_cache",
|
|
"get_cache_stats",
|
|
"get_usage_stats"
|
|
]
|
|
|
|
reporter.log_input("expected_tools", expected_tools, "List of expected MCP tools")
|
|
|
|
# Use FastMCP's async get_tools method
|
|
tools_dict = await app.get_tools()
|
|
actual_tools = list(tools_dict.keys())
|
|
reporter.log_output("actual_tools", actual_tools, quality_score=9.9)
|
|
|
|
missing_tools = set(expected_tools) - set(actual_tools)
|
|
extra_tools = set(actual_tools) - set(expected_tools)
|
|
|
|
if missing_tools:
|
|
reporter.log_quality_metric("missing_tools", len(missing_tools), threshold=0, passed=False)
|
|
|
|
if extra_tools:
|
|
reporter.log_quality_metric("extra_tools", len(extra_tools), threshold=float('inf'), passed=True)
|
|
|
|
# Verify all expected tools exist
|
|
for tool in expected_tools:
|
|
assert tool in actual_tools, f"Tool '{tool}' not found in MCP server. Available: {actual_tools}"
|
|
|
|
assert len(actual_tools) >= len(expected_tools), "Not all expected tools are present"
|
|
|
|
reporter.log_quality_metric("tool_completeness", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_basic_server_functionality(self):
|
|
"""Test basic server functionality without external dependencies."""
|
|
reporter = ReportGenerator("basic_server_functionality")
|
|
|
|
reporter.log_processing_step("server_check", "Verifying basic server setup")
|
|
|
|
# Test that app is properly configured
|
|
assert app.name == "mcrentcast"
|
|
|
|
# Test that we can access tools
|
|
tools = await app.get_tools()
|
|
assert len(tools) > 0
|
|
assert "set_api_key" in tools
|
|
# Test that tool has proper attributes
|
|
tool = tools["set_api_key"]
|
|
assert hasattr(tool, 'name')
|
|
assert tool.name == "set_api_key"
|
|
assert hasattr(tool, 'description')
|
|
|
|
reporter.log_quality_metric("basic_functionality", 1.0, threshold=1.0, passed=True)
|
|
reporter.complete()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run comprehensive test suite
|
|
print("🏠 Starting comprehensive mcrentcast MCP server tests...")
|
|
print("🧪 Testing all 13 MCP tools with caching, rate limiting, and error handling")
|
|
|
|
# Example of how to run specific test categories:
|
|
# pytest tests/test_mcp_server.py -m unit -v
|
|
# pytest tests/test_mcp_server.py -m integration -v
|
|
# pytest tests/test_mcp_server.py -m smoke -v
|
|
# pytest tests/test_mcp_server.py -m performance -v |