Add cache attribution and fix broken DB session methods
- Add fetched_by_key_hash column to CacheEntry model for tracking which Rentcast API key originally fetched each cached response - Fix broken self.db references in SQLiteCacheBackend methods: - delete() now properly creates session from SessionLocal - clear_pattern() fixed with proper session management - health_check() fixed with proper session management - Add SQL injection protection in clear_pattern() by escaping LIKE wildcards (%, _) before pattern conversion - Update proxy_to_rentcast to hash and store the client's Rentcast API key on cache miss - Fix httpx test client to use ASGITransport (required for httpx 0.27+)
This commit is contained in:
parent
26fee9ae74
commit
dde8bd04de
@ -135,7 +135,8 @@ class SQLiteCacheBackend(CacheBackend):
|
|||||||
headers_json=json.dumps(data.get('headers', {})),
|
headers_json=json.dumps(data.get('headers', {})),
|
||||||
expires_at=expires_at,
|
expires_at=expires_at,
|
||||||
ttl_seconds=ttl,
|
ttl_seconds=ttl,
|
||||||
estimated_cost=data.get('estimated_cost', 0.0)
|
estimated_cost=data.get('estimated_cost', 0.0),
|
||||||
|
fetched_by_key_hash=data.get('fetched_by_key_hash'),
|
||||||
)
|
)
|
||||||
db.add(new_entry)
|
db.add(new_entry)
|
||||||
|
|
||||||
@ -150,46 +151,53 @@ class SQLiteCacheBackend(CacheBackend):
|
|||||||
|
|
||||||
async def delete(self, key: str) -> bool:
|
async def delete(self, key: str) -> bool:
|
||||||
"""Soft delete cache entry."""
|
"""Soft delete cache entry."""
|
||||||
try:
|
async with self.SessionLocal() as db:
|
||||||
await self._mark_invalid_by_key(key)
|
try:
|
||||||
await self.db.commit()
|
stmt = update(CacheEntry).where(
|
||||||
logger.debug(f"Cache deleted: {key}")
|
CacheEntry.cache_key == key
|
||||||
return True
|
).values(is_valid=False)
|
||||||
except Exception as e:
|
await db.execute(stmt)
|
||||||
logger.error(f"Error deleting cache entry {key}: {e}")
|
await db.commit()
|
||||||
await self.db.rollback()
|
logger.debug(f"Cache deleted: {key}")
|
||||||
return False
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error deleting cache entry {key}: {e}")
|
||||||
|
await db.rollback()
|
||||||
|
return False
|
||||||
|
|
||||||
async def clear_pattern(self, pattern: str) -> int:
|
async def clear_pattern(self, pattern: str) -> int:
|
||||||
"""Clear cache entries matching pattern."""
|
"""Clear cache entries matching pattern."""
|
||||||
try:
|
async with self.SessionLocal() as db:
|
||||||
# Convert pattern to SQL LIKE pattern
|
try:
|
||||||
sql_pattern = pattern.replace('*', '%')
|
# Escape SQL LIKE special characters first, then convert glob wildcards
|
||||||
|
escaped = pattern.replace('%', r'\%').replace('_', r'\_')
|
||||||
stmt = update(CacheEntry).where(
|
sql_pattern = escaped.replace('*', '%')
|
||||||
CacheEntry.cache_key.like(sql_pattern)
|
|
||||||
).values(is_valid=False)
|
stmt = update(CacheEntry).where(
|
||||||
|
CacheEntry.cache_key.like(sql_pattern, escape='\\')
|
||||||
result = await self.db.execute(stmt)
|
).values(is_valid=False)
|
||||||
await self.db.commit()
|
|
||||||
|
result = await db.execute(stmt)
|
||||||
count = result.rowcount
|
await db.commit()
|
||||||
logger.info(f"Cleared {count} cache entries matching pattern: {pattern}")
|
|
||||||
return count
|
count = result.rowcount
|
||||||
|
logger.info(f"Cleared {count} cache entries matching pattern: {pattern}")
|
||||||
except Exception as e:
|
return count
|
||||||
logger.error(f"Error clearing cache pattern {pattern}: {e}")
|
|
||||||
await self.db.rollback()
|
except Exception as e:
|
||||||
return 0
|
logger.error(f"Error clearing cache pattern {pattern}: {e}")
|
||||||
|
await db.rollback()
|
||||||
|
return 0
|
||||||
|
|
||||||
async def health_check(self) -> bool:
|
async def health_check(self) -> bool:
|
||||||
"""Check SQLite database health."""
|
"""Check SQLite database health."""
|
||||||
try:
|
async with self.SessionLocal() as db:
|
||||||
await self.db.execute(select(1))
|
try:
|
||||||
return True
|
await db.execute(select(1))
|
||||||
except Exception as e:
|
return True
|
||||||
logger.error(f"SQLite health check failed: {e}")
|
except Exception as e:
|
||||||
return False
|
logger.error(f"SQLite health check failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
async def _mark_invalid(self, entry_id: int, db: AsyncSession):
|
async def _mark_invalid(self, entry_id: int, db: AsyncSession):
|
||||||
"""Mark specific entry as invalid."""
|
"""Mark specific entry as invalid."""
|
||||||
|
|||||||
@ -56,7 +56,10 @@ class CacheEntry(Base, TimestampMixin):
|
|||||||
|
|
||||||
# Cost tracking (if applicable)
|
# Cost tracking (if applicable)
|
||||||
estimated_cost = Column(Float, default=0.0)
|
estimated_cost = Column(Float, default=0.0)
|
||||||
|
|
||||||
|
# Attribution - which Rentcast API key originally fetched this data
|
||||||
|
fetched_by_key_hash = Column(String(64), index=True, nullable=True)
|
||||||
|
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
Index('idx_cache_valid_expires', 'is_valid', 'expires_at'),
|
Index('idx_cache_valid_expires', 'is_valid', 'expires_at'),
|
||||||
Index('idx_cache_endpoint_method', 'endpoint', 'method'),
|
Index('idx_cache_endpoint_method', 'endpoint', 'method'),
|
||||||
|
|||||||
@ -449,7 +449,8 @@ async def proxy_to_rentcast(
|
|||||||
response_data = response.json() if response.content else {}
|
response_data = response.json() if response.content else {}
|
||||||
response_time = (time.time() - start_time) * 1000
|
response_time = (time.time() - start_time) * 1000
|
||||||
|
|
||||||
# Store in cache
|
# Store in cache with attribution to the key that fetched it
|
||||||
|
key_hash = hashlib.sha256(rentcast_key.encode()).hexdigest() if rentcast_key else None
|
||||||
cache_entry_data = {
|
cache_entry_data = {
|
||||||
"data": response_data,
|
"data": response_data,
|
||||||
"status_code": response.status_code,
|
"status_code": response.status_code,
|
||||||
@ -458,7 +459,8 @@ async def proxy_to_rentcast(
|
|||||||
"method": method,
|
"method": method,
|
||||||
"params": cache_data,
|
"params": cache_data,
|
||||||
"params_hash": hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest(),
|
"params_hash": hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest(),
|
||||||
"estimated_cost": endpoint_config["cost_estimate"]
|
"estimated_cost": endpoint_config["cost_estimate"],
|
||||||
|
"fetched_by_key_hash": key_hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
ttl = ttl_override or endpoint_config["ttl"]
|
ttl = ttl_override or endpoint_config["ttl"]
|
||||||
|
|||||||
@ -4,7 +4,7 @@ Pytest configuration for RentCache tests.
|
|||||||
import asyncio
|
import asyncio
|
||||||
import pytest
|
import pytest
|
||||||
import pytest_asyncio
|
import pytest_asyncio
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient, ASGITransport
|
||||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||||
from sqlalchemy.pool import StaticPool
|
from sqlalchemy.pool import StaticPool
|
||||||
|
|
||||||
@ -63,7 +63,8 @@ async def test_client(test_session):
|
|||||||
|
|
||||||
app.dependency_overrides[get_db] = override_get_db
|
app.dependency_overrides[get_db] = override_get_db
|
||||||
|
|
||||||
async with AsyncClient(app=app, base_url="http://test") as client:
|
transport = ASGITransport(app=app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
app.dependency_overrides.clear()
|
app.dependency_overrides.clear()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user