"""
Comprehensive tests for JavaScript execution API enhancements.
Tests the proposed JavaScript execution features in get(), get_many(),
and discover() functions using a mock HTTP server.
"""
import asyncio
import json
import pytest
from aiohttp import web
from aiohttp.test_utils import TestServer
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, Any, List
# These imports assume the enhanced API is implemented
# For now, we'll test against the proposed interface
from crawailer import Browser, BrowserConfig
from crawailer.content import WebContent
from crawailer.api import get, get_many, discover
class MockHTTPServer:
"""Mock HTTP server that serves test pages with JavaScript."""
def __init__(self):
self.app = web.Application()
self.setup_routes()
self.server = None
self.port = None
def setup_routes(self):
"""Set up test routes with various JavaScript scenarios."""
self.app.router.add_get('/', self.index_page)
self.app.router.add_get('/dynamic-price', self.dynamic_price_page)
self.app.router.add_get('/infinite-scroll', self.infinite_scroll_page)
self.app.router.add_get('/load-more', self.load_more_page)
self.app.router.add_get('/spa-content', self.spa_page)
self.app.router.add_get('/search', self.search_results_page)
self.app.router.add_get('/article/{id}', self.article_page)
self.app.router.add_get('/api/data', self.api_endpoint)
async def start(self):
"""Start the mock server."""
self.server = TestServer(self.app, port=0)
await self.server.start()
self.port = self.server.port
return f"http://localhost:{self.port}"
async def stop(self):
"""Stop the mock server."""
if self.server:
await self.server.close()
async def index_page(self, request):
"""Simple index page."""
html = """
Test Page
Test Page
Initial content
"""
return web.Response(text=html, content_type='text/html')
async def dynamic_price_page(self, request):
"""E-commerce page with dynamically loaded price."""
html = """
Product Page
Amazing Product
$99.99
"""
return web.Response(text=html, content_type='text/html')
async def infinite_scroll_page(self, request):
"""Page with infinite scroll functionality."""
html = """
Infinite Scroll
Infinite Content
Loading...
No more content
"""
return web.Response(text=html, content_type='text/html')
async def load_more_page(self, request):
"""Page with 'Load More' button."""
html = """
Load More Content
Articles
Article 1
Preview text...
Read More
Full article content here...
Article 2
Preview text...
Read More
Full article content here...
Load More Articles
"""
return web.Response(text=html, content_type='text/html')
async def spa_page(self, request):
"""Single Page Application with React-like behavior."""
html = """
SPA Demo
Loading...
"""
return web.Response(text=html, content_type='text/html')
async def search_results_page(self, request):
"""Search results page for discovery testing."""
query = request.query.get('q', 'test')
html = f"""
Search: {query}
Search Results for "{query}"
Show More Results
"""
return web.Response(text=html, content_type='text/html')
async def article_page(self, request):
"""Individual article page."""
article_id = request.match_info['id']
html = f"""
Article {article_id}
Article {article_id}
Author Name
2024-01-01
Click to expand abstract...
This is the full abstract of article {article_id}.
It contains detailed information about the research.
Article content goes here...
"""
return web.Response(text=html, content_type='text/html')
async def api_endpoint(self, request):
"""JSON API endpoint for testing."""
data = {
"status": "success",
"data": {"items": [1, 2, 3]}
}
return web.json_response(data)
@pytest.fixture
async def mock_server():
"""Fixture to provide mock HTTP server."""
server = MockHTTPServer()
base_url = await server.start()
yield server, base_url
await server.stop()
@pytest.fixture
def mock_browser():
"""Fixture to provide mocked Browser instance."""
with patch('crawailer.api._browser') as mock:
browser = AsyncMock(spec=Browser)
mock.return_value = browser
yield browser
# Test JavaScript execution in get() function
class TestGetWithJavaScript:
"""Tests for get() function with JavaScript execution."""
@pytest.mark.asyncio
async def test_get_with_script_before(self, mock_server):
"""Test executing JavaScript before content extraction."""
server, base_url = mock_server
# Mock the enhanced get() function
with patch('crawailer.api.get') as mock_get:
# Simulate successful JS execution
mock_content = WebContent(
url=f"{base_url}/dynamic-price",
title="Product Page",
text="Amazing Product $79.99",
markdown="# Amazing Product\n\n$79.99",
html="...",
script_result="$79.99"
)
mock_get.return_value = mock_content
# Call with script
content = await get(
f"{base_url}/dynamic-price",
script="document.querySelector('.final-price').innerText",
wait_for=".final-price"
)
assert content.script_result == "$79.99"
assert "$79.99" in content.text
@pytest.mark.asyncio
async def test_get_with_scroll_script(self, mock_server):
"""Test scrolling to load more content."""
server, base_url = mock_server
with patch('crawailer.api.get') as mock_get:
# Simulate content after scrolling
mock_content = WebContent(
url=f"{base_url}/infinite-scroll",
title="Infinite Scroll",
text="Infinite Content Item 1 Item 2 Item 3 Item 4 Item 5 Item 6",
markdown="# Infinite Content\n\nItem 1\nItem 2\nItem 3\nItem 4\nItem 5\nItem 6",
html="...",
script_result=None
)
mock_get.return_value = mock_content
content = await get(
f"{base_url}/infinite-scroll",
script_before="""
window.scrollTo(0, document.body.scrollHeight);
await new Promise(r => setTimeout(r, 600));
window.loadMore();
""",
wait_for=".end-of-content"
)
# Should have more items after scrolling
assert "Item 6" in content.text
@pytest.mark.asyncio
async def test_get_with_click_expand(self, mock_server):
"""Test clicking buttons to expand content."""
server, base_url = mock_server
with patch('crawailer.api.get') as mock_get:
# Simulate expanded content
mock_content = WebContent(
url=f"{base_url}/load-more",
title="Load More Content",
text="Articles Article 1 Full article content here... Article 2 Full article content here...",
markdown="# Articles\n\n## Article 1\n\nFull article content here...\n\n## Article 2\n\nFull article content here...",
html="..."
)
mock_get.return_value = mock_content
content = await get(
f"{base_url}/load-more",
script_before="""
document.querySelectorAll('.read-more-btn').forEach(btn => btn.click());
"""
)
assert "Full article content" in content.text
@pytest.mark.asyncio
async def test_get_spa_wait_for_app(self, mock_server):
"""Test waiting for SPA to initialize."""
server, base_url = mock_server
with patch('crawailer.api.get') as mock_get:
mock_content = WebContent(
url=f"{base_url}/spa-content",
title="SPA Demo",
text="SPA Content Loaded User: John Doe Status: Active",
markdown="# SPA Content Loaded\n\nUser: John Doe\nStatus: Active",
html="...",
script_result=True
)
mock_get.return_value = mock_content
content = await get(
f"{base_url}/spa-content",
script="window.appReady",
wait_for=".app"
)
assert content.script_result is True
assert "John Doe" in content.text
assert "Active" in content.text
@pytest.mark.asyncio
async def test_get_script_error_handling(self, mock_server):
"""Test handling of JavaScript execution errors."""
server, base_url = mock_server
with patch('crawailer.api.get') as mock_get:
mock_content = WebContent(
url=f"{base_url}/",
title="Test Page",
text="Test Page Initial content",
markdown="# Test Page\n\nInitial content",
html="...",
script_error="ReferenceError: nonexistent is not defined"
)
mock_get.return_value = mock_content
content = await get(
f"{base_url}/",
script="nonexistent.function()"
)
assert content.script_error is not None
assert "ReferenceError" in content.script_error
# Test JavaScript execution in get_many() function
class TestGetManyWithJavaScript:
"""Tests for get_many() function with JavaScript execution."""
@pytest.mark.asyncio
async def test_get_many_same_script(self, mock_server):
"""Test applying same script to multiple URLs."""
server, base_url = mock_server
urls = [
f"{base_url}/load-more",
f"{base_url}/article/1",
f"{base_url}/article/2"
]
with patch('crawailer.api.get_many') as mock_get_many:
mock_results = [
WebContent(
url=urls[0],
title="Load More Content",
text="Expanded content",
markdown="# Load More\n\nExpanded content",
html="..."
),
WebContent(
url=urls[1],
title="Article 1",
text="Full abstract shown",
markdown="# Article 1\n\nFull abstract shown",
html="..."
),
WebContent(
url=urls[2],
title="Article 2",
text="Full abstract shown",
markdown="# Article 2\n\nFull abstract shown",
html="..."
)
]
mock_get_many.return_value = mock_results
results = await get_many(
urls,
script="document.querySelectorAll('[onclick]').forEach(el => el.click())"
)
assert len(results) == 3
for result in results:
assert result is not None
@pytest.mark.asyncio
async def test_get_many_different_scripts(self, mock_server):
"""Test applying different scripts to different URLs."""
server, base_url = mock_server
urls = [
f"{base_url}/infinite-scroll",
f"{base_url}/load-more",
f"{base_url}/spa-content"
]
scripts = [
"window.scrollTo(0, document.body.scrollHeight)",
"document.querySelector('.load-more').click()",
"window.appReady"
]
with patch('crawailer.api.get_many') as mock_get_many:
mock_results = [
WebContent(
url=urls[0],
title="Infinite Scroll",
text="More items loaded",
markdown="# Infinite Scroll\n\nMore items",
html="...",
script_result=None
),
WebContent(
url=urls[1],
title="Load More Content",
text="More articles loaded",
markdown="# Load More\n\nMore articles",
html="...",
script_result=None
),
WebContent(
url=urls[2],
title="SPA Demo",
text="SPA loaded",
markdown="# SPA Demo\n\nLoaded",
html="...",
script_result=True
)
]
mock_get_many.return_value = mock_results
results = await get_many(urls, script=scripts)
assert len(results) == 3
assert results[2].script_result is True
@pytest.mark.asyncio
async def test_get_many_mixed_scripts(self, mock_server):
"""Test mix of URLs with and without scripts."""
server, base_url = mock_server
urls = [
f"{base_url}/", # No script needed
f"{base_url}/dynamic-price", # Needs script
f"{base_url}/api/data" # No script needed
]
scripts = [
None,
"document.querySelector('.final-price').innerText",
None
]
with patch('crawailer.api.get_many') as mock_get_many:
mock_results = [
WebContent(
url=urls[0],
title="Test Page",
text="Initial content",
markdown="# Test Page",
html="..."
),
WebContent(
url=urls[1],
title="Product Page",
text="Price: $79.99",
markdown="# Product\n\n$79.99",
html="...",
script_result="$79.99"
),
WebContent(
url=urls[2],
title="API Response",
text='{"status":"success"}',
markdown="API data",
html="..."
)
]
mock_get_many.return_value = mock_results
results = await get_many(urls, script=scripts)
assert results[0].script_result is None
assert results[1].script_result == "$79.99"
assert results[2].script_result is None
# Test JavaScript execution in discover() function
class TestDiscoverWithJavaScript:
"""Tests for discover() function with JavaScript execution."""
@pytest.mark.asyncio
async def test_discover_with_search_script(self, mock_server):
"""Test executing script on search results page."""
server, base_url = mock_server
with patch('crawailer.api.discover') as mock_discover:
# Simulate discovering more results after clicking "Show More"
mock_results = [
WebContent(
url=f"{base_url}/article/1",
title="Result 1: AI",
text="Article about AI",
markdown="# Result 1\n\nAI content",
html="..."
),
WebContent(
url=f"{base_url}/article/2",
title="Result 2: AI",
text="Another AI article",
markdown="# Result 2\n\nMore AI",
html="..."
),
WebContent(
url=f"{base_url}/article/3",
title="Result 3: AI",
text="Third AI article",
markdown="# Result 3\n\nAI research",
html="..."
)
]
mock_discover.return_value = mock_results
results = await discover(
"AI research",
script="document.querySelector('.show-more')?.click()",
max_pages=5
)
assert len(results) == 3
assert all("AI" in r.title for r in results)
@pytest.mark.asyncio
async def test_discover_with_content_script(self, mock_server):
"""Test executing script on each discovered page."""
server, base_url = mock_server
with patch('crawailer.api.discover') as mock_discover:
# Simulate expanded abstracts on article pages
mock_results = [
WebContent(
url=f"{base_url}/article/1",
title="Article 1",
text="Full abstract: Detailed research information",
markdown="# Article 1\n\nFull abstract",
html="...",
script_result="expanded"
),
WebContent(
url=f"{base_url}/article/2",
title="Article 2",
text="Full abstract: More research details",
markdown="# Article 2\n\nFull abstract",
html="...",
script_result="expanded"
)
]
mock_discover.return_value = mock_results
results = await discover(
"research papers",
content_script="""
document.querySelector('.abstract')?.click();
return 'expanded';
""",
max_pages=2
)
assert all(r.script_result == "expanded" for r in results)
assert all("Full abstract" in r.text for r in results)
@pytest.mark.asyncio
async def test_discover_with_both_scripts(self, mock_server):
"""Test using both search and content scripts."""
server, base_url = mock_server
with patch('crawailer.api.discover') as mock_discover:
mock_results = [
WebContent(
url=f"{base_url}/article/1",
title="Enhanced Result 1",
text="Complete content with expanded sections",
markdown="# Enhanced Result 1",
html="..."
),
WebContent(
url=f"{base_url}/article/2",
title="Enhanced Result 2",
text="Complete content with expanded sections",
markdown="# Enhanced Result 2",
html="..."
)
]
mock_discover.return_value = mock_results
results = await discover(
"comprehensive search",
script="document.querySelector('.show-more')?.click()",
content_script="document.querySelectorAll('.expand').forEach(e => e.click())",
max_pages=10
)
assert len(results) == 2
assert all("Complete content" in r.text for r in results)
# Test Browser class JavaScript execution
class TestBrowserJavaScriptExecution:
"""Tests for Browser class execute_script method."""
@pytest.mark.asyncio
async def test_execute_script_basic(self):
"""Test basic script execution."""
browser = Browser(BrowserConfig())
# Mock Playwright components
mock_page = AsyncMock()
mock_page.evaluate.return_value = {"result": "test"}
mock_page.goto = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
result = await browser.execute_script(
"https://example.com",
"return {result: 'test'}"
)
assert result == {"result": "test"}
mock_page.evaluate.assert_called_once_with("return {result: 'test'}")
mock_page.close.assert_called_once()
@pytest.mark.asyncio
async def test_execute_script_dom_query(self):
"""Test DOM querying via script."""
browser = Browser(BrowserConfig())
mock_page = AsyncMock()
mock_page.evaluate.return_value = 5
mock_page.goto = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
result = await browser.execute_script(
"https://example.com",
"document.querySelectorAll('div').length"
)
assert result == 5
@pytest.mark.asyncio
async def test_execute_script_async_js(self):
"""Test async JavaScript execution."""
browser = Browser(BrowserConfig())
mock_page = AsyncMock()
mock_page.evaluate.return_value = "delayed result"
mock_page.goto = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
script = """
async () => {
await new Promise(r => setTimeout(r, 100));
return 'delayed result';
}
"""
result = await browser.execute_script("https://example.com", script)
assert result == "delayed result"
@pytest.mark.asyncio
async def test_execute_script_error(self):
"""Test script execution error handling."""
browser = Browser(BrowserConfig())
mock_page = AsyncMock()
mock_page.evaluate.side_effect = Exception("Script error: undefined is not a function")
mock_page.goto = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
with pytest.raises(Exception) as exc_info:
await browser.execute_script(
"https://example.com",
"nonexistent.function()"
)
assert "undefined is not a function" in str(exc_info.value)
@pytest.mark.asyncio
async def test_execute_script_timeout(self):
"""Test script execution timeout."""
browser = Browser(BrowserConfig())
mock_page = AsyncMock()
mock_page.goto.side_effect = asyncio.TimeoutError("Navigation timeout")
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
with pytest.raises(asyncio.TimeoutError):
await browser.execute_script(
"https://slow-site.com",
"return true",
timeout=1
)
@pytest.mark.asyncio
async def test_browser_execute_script_basic(self):
"""Test basic script execution (alias for compatibility)."""
await self.test_execute_script_basic()
@pytest.mark.asyncio
async def test_browser_execute_script_error(self):
"""Test script execution error handling (alias for compatibility)."""
await self.test_execute_script_error()
@pytest.mark.asyncio
async def test_browser_script_timeout(self):
"""Test script execution timeout (alias for compatibility)."""
await self.test_execute_script_timeout()
@pytest.mark.asyncio
async def test_browser_fetch_page_with_scripts(self):
"""Test fetch_page with script_before and script_after parameters."""
browser = Browser(BrowserConfig())
# Mock Playwright components
mock_page = AsyncMock()
mock_page.goto = AsyncMock()
mock_page.set_viewport_size = AsyncMock()
mock_page.content.return_value = "Test "
mock_page.title.return_value = "Test Page"
mock_page.close = AsyncMock()
# Mock script execution results
script_calls = []
def mock_evaluate(script):
script_calls.append(script)
if "before" in script:
return {"before_result": "success"}
elif "after" in script:
return {"after_result": "complete"}
return None
mock_page.evaluate.side_effect = mock_evaluate
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
mock_response = AsyncMock()
mock_response.status = 200
mock_page.goto.return_value = mock_response
browser._browser = mock_browser
browser._is_started = True
# Test with both script_before and script_after
result = await browser.fetch_page(
"https://example.com",
script_before="return {before: true}",
script_after="return {after: true}"
)
# Verify the result structure
assert result["url"] == "https://example.com"
assert result["status"] == 200
assert result["html"] == "Test "
assert result["title"] == "Test Page"
assert "script_result" in result
assert "script_error" in result
# Script result should contain both before and after results
assert result["script_result"] == {
"script_before": {"before_result": "success"},
"script_after": {"after_result": "complete"}
}
assert result["script_error"] is None
# Verify script execution order (before content extraction, after content extraction)
assert len(script_calls) == 2
mock_page.evaluate.assert_any_call("return {before: true}")
mock_page.evaluate.assert_any_call("return {after: true}")
@pytest.mark.asyncio
async def test_browser_fetch_page_script_before_only(self):
"""Test fetch_page with only script_before parameter."""
browser = Browser(BrowserConfig())
# Mock setup
mock_page = AsyncMock()
mock_page.goto = AsyncMock()
mock_page.set_viewport_size = AsyncMock()
mock_page.content.return_value = "Test "
mock_page.title.return_value = "Test Page"
mock_page.evaluate.return_value = {"data": "extracted"}
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
mock_response = AsyncMock()
mock_response.status = 200
mock_page.goto.return_value = mock_response
browser._browser = mock_browser
browser._is_started = True
result = await browser.fetch_page(
"https://example.com",
script_before="return document.querySelector('h1').innerText"
)
assert result["script_result"] == {"data": "extracted"}
assert result["script_error"] is None
mock_page.evaluate.assert_called_once_with("return document.querySelector('h1').innerText")
@pytest.mark.asyncio
async def test_browser_fetch_page_script_error_handling(self):
"""Test fetch_page script error handling."""
browser = Browser(BrowserConfig())
# Mock setup
mock_page = AsyncMock()
mock_page.goto = AsyncMock()
mock_page.set_viewport_size = AsyncMock()
mock_page.content.return_value = "Test "
mock_page.title.return_value = "Test Page"
mock_page.evaluate.side_effect = Exception("Script syntax error")
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
mock_response = AsyncMock()
mock_response.status = 200
mock_page.goto.return_value = mock_response
browser._browser = mock_browser
browser._is_started = True
result = await browser.fetch_page(
"https://example.com",
script_before="invalid javascript syntax %@#$"
)
assert result["script_result"] is None
assert "Script execution error: Script syntax error" in result["script_error"]
# Page should still load successfully
assert result["status"] == 200
assert result["html"] == "Test "
@pytest.mark.asyncio
async def test_browser_fetch_page_page_load_error_with_scripts(self):
"""Test fetch_page when page load fails but scripts were requested."""
browser = Browser(BrowserConfig())
# Mock setup
mock_page = AsyncMock()
mock_page.goto.side_effect = Exception("Network error")
mock_page.set_viewport_size = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
result = await browser.fetch_page(
"https://unreachable-site.com",
script_before="return true"
)
# Should handle the error gracefully
assert result["status"] == 0
assert result["error"] == "Network error"
assert result["script_result"] is None
assert "Page load failed, scripts not executed: Network error" in result["script_error"]
# Test utilities and integration
class TestJavaScriptIntegration:
"""Integration tests for JavaScript execution."""
@pytest.mark.asyncio
async def test_real_browser_js_execution(self, mock_server):
"""Test with real browser if available (integration test)."""
server, base_url = mock_server
# This test requires Playwright to be installed
pytest.importorskip("playwright")
from crawailer import Browser, BrowserConfig
browser = Browser(BrowserConfig(headless=True))
try:
await browser.start()
# Test dynamic price extraction
result = await browser.execute_script(
f"{base_url}/dynamic-price",
"""
await new Promise(r => setTimeout(r, 500));
return document.querySelector('.final-price')?.innerText;
"""
)
# Should get discounted price
assert result is not None
# Price should be discounted (80% of 99.99 = 79.99)
assert "79.99" in result
finally:
await browser.close()
@pytest.mark.asyncio
async def test_performance_multiple_scripts(self):
"""Test performance with multiple script executions."""
browser = Browser(BrowserConfig())
# Mock setup
mock_page = AsyncMock()
mock_page.evaluate.return_value = "result"
mock_page.goto = AsyncMock()
mock_page.close = AsyncMock()
mock_browser = AsyncMock()
mock_browser.new_page.return_value = mock_page
browser._browser = mock_browser
browser._is_started = True
# Execute multiple scripts concurrently
urls = [f"https://example.com/page{i}" for i in range(10)]
scripts = ["return document.title" for _ in range(10)]
tasks = [
browser.execute_script(url, script)
for url, script in zip(urls, scripts)
]
results = await asyncio.gather(*tasks)
assert len(results) == 10
assert all(r == "result" for r in results)
# Verify all pages were closed
assert mock_page.close.call_count == 10
# Test WebContent enhancements
class TestWebContentJavaScriptFields:
"""Test WebContent dataclass JavaScript-related fields."""
def test_webcontent_with_script_result(self):
"""Test WebContent with script_result field."""
content = WebContent(
url="https://example.com",
title="Test",
text="Content",
markdown="# Test",
html="",
script_result={"data": "value"}
)
assert content.script_result == {"data": "value"}
assert content.script_error is None
def test_webcontent_with_script_error(self):
"""Test WebContent with script_error field."""
content = WebContent(
url="https://example.com",
title="Test",
text="Content",
markdown="# Test",
html="",
script_error="ReferenceError: x is not defined"
)
assert content.script_result is None
assert "ReferenceError" in content.script_error
def test_webcontent_serialization(self):
"""Test WebContent serialization with JS fields."""
content = WebContent(
url="https://example.com",
title="Test",
text="Content",
markdown="# Test",
html="",
script_result=[1, 2, 3],
script_error=None
)
# Should be serializable
import json
data = {
"url": content.url,
"title": content.title,
"script_result": content.script_result
}
serialized = json.dumps(data)
assert "[1, 2, 3]" in serialized
if __name__ == "__main__":
# Run tests
pytest.main([__file__, "-v"])