Test infrastructure with conftest fixtures mocking run_shell_args/run_adb for device-free testing across all 8 mixins. Fixed: UI parser regex couldn't match hyphenated XML attributes (content-desc, resource-id). Notification parser captured trailing parenthesis in package names.
392 lines
15 KiB
Python
392 lines
15 KiB
Python
"""Tests for settings mixin (settings, toggles, notifications, clipboard, media)."""
|
|
|
|
import pytest
|
|
|
|
from src.mixins.settings import _MEDIA_KEYCODES, SettingsMixin
|
|
from tests.conftest import fail, ok
|
|
|
|
|
|
class TestSettingsGet:
|
|
async def test_valid(self, server):
|
|
server.run_shell_args.return_value = ok(stdout="1")
|
|
result = await server.settings_get("global", "wifi_on")
|
|
assert result["success"] is True
|
|
assert result["value"] == "1"
|
|
assert result["exists"] is True
|
|
|
|
async def test_null_value(self, server):
|
|
server.run_shell_args.return_value = ok(stdout="null")
|
|
result = await server.settings_get("global", "missing_key")
|
|
assert result["success"] is True
|
|
assert result["value"] is None
|
|
assert result["exists"] is False
|
|
|
|
async def test_invalid_namespace(self, server):
|
|
result = await server.settings_get("invalid", "key")
|
|
assert result["success"] is False
|
|
assert "Invalid namespace" in result["error"]
|
|
|
|
async def test_invalid_key(self, server):
|
|
result = await server.settings_get("global", "bad key!")
|
|
assert result["success"] is False
|
|
assert "Invalid key" in result["error"]
|
|
|
|
async def test_all_namespaces_valid(self, server):
|
|
server.run_shell_args.return_value = ok(stdout="value")
|
|
for ns in ("system", "global", "secure"):
|
|
result = await server.settings_get(ns, "test_key")
|
|
assert result["success"] is True
|
|
|
|
async def test_key_with_dots(self, server):
|
|
server.run_shell_args.return_value = ok(stdout="value")
|
|
result = await server.settings_get("global", "wifi.scan_always_enabled")
|
|
assert result["success"] is True
|
|
|
|
|
|
class TestSettingsPut:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_write_and_verify(self, server, ctx):
|
|
server.run_shell_args.side_effect = [ok(), ok(stdout="128")]
|
|
result = await server.settings_put(ctx, "system", "screen_brightness", "128")
|
|
assert result["success"] is True
|
|
assert result["readback"] == "128"
|
|
assert result["verified"] is True
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_invalid_namespace(self, server, ctx):
|
|
result = await server.settings_put(ctx, "bad", "key", "val")
|
|
assert result["success"] is False
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_invalid_key(self, server, ctx):
|
|
result = await server.settings_put(ctx, "global", "k;ey", "val")
|
|
assert result["success"] is False
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_secure_namespace_elicits(self, server, ctx):
|
|
ctx.set_elicit("accept", "Yes, write setting")
|
|
server.run_shell_args.side_effect = [ok(), ok(stdout="val")]
|
|
result = await server.settings_put(ctx, "secure", "key", "val")
|
|
assert result["success"] is True
|
|
# Verify elicitation happened
|
|
assert any("secure" in msg for _, msg in ctx.messages)
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_secure_cancelled(self, server, ctx):
|
|
ctx.set_elicit("accept", "Cancel")
|
|
result = await server.settings_put(ctx, "secure", "key", "val")
|
|
assert result["success"] is False
|
|
assert result.get("cancelled") is True
|
|
|
|
@pytest.mark.usefixtures("_no_dev_mode")
|
|
async def test_requires_dev_mode(self, server, ctx):
|
|
result = await server.settings_put(ctx, "system", "k", "v")
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestWifiToggle:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_enable(self, server):
|
|
server.run_shell_args.side_effect = [ok(), ok(stdout="1")]
|
|
result = await server.wifi_toggle(True)
|
|
assert result["success"] is True
|
|
assert result["action"] == "enable"
|
|
assert result["verified"] is True
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_disable(self, server):
|
|
server.run_shell_args.side_effect = [ok(), ok(stdout="0")]
|
|
result = await server.wifi_toggle(False)
|
|
assert result["action"] == "disable"
|
|
assert result["verified"] is True
|
|
|
|
@pytest.mark.usefixtures("_no_dev_mode")
|
|
async def test_requires_dev_mode(self, server):
|
|
result = await server.wifi_toggle(True)
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestBluetoothToggle:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_enable(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
result = await server.bluetooth_toggle(True)
|
|
assert result["success"] is True
|
|
assert result["action"] == "enable"
|
|
|
|
@pytest.mark.usefixtures("_no_dev_mode")
|
|
async def test_requires_dev_mode(self, server):
|
|
result = await server.bluetooth_toggle(False)
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestAirplaneModeToggle:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_enable_usb_device(self, server, ctx):
|
|
ctx.set_elicit("accept", "Yes, enable airplane mode")
|
|
server.run_shell_args.side_effect = [ok(), ok()]
|
|
result = await server.airplane_mode_toggle(ctx, True)
|
|
assert result["success"] is True
|
|
assert result["airplane_mode"] is True
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_enable_network_device_warns(self, server, ctx):
|
|
server.set_current_device("10.20.0.25:5555")
|
|
ctx.set_elicit("accept", "Yes, enable airplane mode")
|
|
server.run_shell_args.side_effect = [ok(), ok()]
|
|
result = await server.airplane_mode_toggle(ctx, True)
|
|
assert result["success"] is True
|
|
# Should have warned about network disconnection
|
|
warns = [msg for level, msg in ctx.messages if "sever" in msg.lower()]
|
|
assert len(warns) > 0
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_cancelled(self, server, ctx):
|
|
ctx.set_elicit("accept", "Cancel")
|
|
result = await server.airplane_mode_toggle(ctx, True)
|
|
assert result["success"] is False
|
|
assert result.get("cancelled") is True
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_disable_no_elicitation(self, server, ctx):
|
|
server.run_shell_args.side_effect = [ok(), ok()]
|
|
result = await server.airplane_mode_toggle(ctx, False)
|
|
assert result["success"] is True
|
|
# No elicitation for disable
|
|
elicits = [m for level, m in ctx.messages if level == "elicit"]
|
|
assert len(elicits) == 0
|
|
|
|
@pytest.mark.usefixtures("_no_dev_mode")
|
|
async def test_requires_dev_mode(self, server, ctx):
|
|
result = await server.airplane_mode_toggle(ctx, True)
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestScreenBrightness:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_set(self, server):
|
|
server.run_shell_args.side_effect = [ok(), ok()]
|
|
result = await server.screen_brightness(128)
|
|
assert result["success"] is True
|
|
assert result["brightness"] == 128
|
|
assert result["auto_brightness"] is False
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_out_of_range(self, server):
|
|
result = await server.screen_brightness(300)
|
|
assert result["success"] is False
|
|
assert "0-255" in result["error"]
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_negative(self, server):
|
|
result = await server.screen_brightness(-1)
|
|
assert result["success"] is False
|
|
|
|
@pytest.mark.usefixtures("_no_dev_mode")
|
|
async def test_requires_dev_mode(self, server):
|
|
result = await server.screen_brightness(128)
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestScreenTimeout:
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_set(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
result = await server.screen_timeout(30)
|
|
assert result["success"] is True
|
|
assert result["timeout_seconds"] == 30
|
|
assert result["timeout_ms"] == 30000
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_too_large(self, server):
|
|
result = await server.screen_timeout(9999)
|
|
assert result["success"] is False
|
|
assert "1-1800" in result["error"]
|
|
|
|
@pytest.mark.usefixtures("_dev_mode")
|
|
async def test_zero(self, server):
|
|
result = await server.screen_timeout(0)
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestNotificationList:
|
|
async def test_parse_notifications(self, server):
|
|
dumpsys_output = """
|
|
NotificationRecord(0x1234 pkg=com.example.app)
|
|
extras {
|
|
android.title=Test Title
|
|
android.text=Test message body
|
|
}
|
|
postTime=1700000000000
|
|
NotificationRecord(0x5678 pkg=com.other.app)
|
|
extras {
|
|
android.title=Second
|
|
android.text=Another notification
|
|
}
|
|
postTime=1700000001000
|
|
"""
|
|
server.run_shell_args.return_value = ok(stdout=dumpsys_output)
|
|
result = await server.notification_list()
|
|
assert result["success"] is True
|
|
assert result["count"] == 2
|
|
assert result["notifications"][0]["package"] == "com.example.app"
|
|
assert result["notifications"][0]["title"] == "Test Title"
|
|
assert result["notifications"][0]["text"] == "Test message body"
|
|
|
|
async def test_limit(self, server):
|
|
# Build output with many notifications
|
|
lines = []
|
|
for i in range(10):
|
|
lines.append(f" NotificationRecord(0x{i:04x} pkg=com.app{i})")
|
|
lines.append(f" android.title=Title {i}")
|
|
server.run_shell_args.return_value = ok(stdout="\n".join(lines))
|
|
result = await server.notification_list(limit=3)
|
|
assert result["count"] <= 3
|
|
|
|
async def test_empty(self, server):
|
|
server.run_shell_args.return_value = ok(stdout="")
|
|
result = await server.notification_list()
|
|
assert result["success"] is True
|
|
assert result["count"] == 0
|
|
|
|
|
|
class TestClipboardGet:
|
|
async def test_parses_parcel(self, server):
|
|
# Build parcel programmatically with correct encoding
|
|
server.run_shell_args.return_value = ok(stdout=_build_parcel("hello world"))
|
|
result = await server.clipboard_get()
|
|
assert result["success"] is True
|
|
assert result["text"] == "hello world"
|
|
|
|
async def test_empty_clipboard(self, server):
|
|
server.run_shell_args.return_value = ok(
|
|
stdout="Result: Parcel(00000000 00000000 '........')"
|
|
)
|
|
result = await server.clipboard_get()
|
|
# No text/plain marker = not parseable
|
|
assert result["success"] is False
|
|
|
|
async def test_failure(self, server):
|
|
server.run_shell_args.return_value = fail("error")
|
|
result = await server.clipboard_get()
|
|
assert result["success"] is False
|
|
|
|
|
|
def _build_parcel(text: str) -> str:
|
|
"""Build a fake service call parcel output containing clipboard text.
|
|
|
|
Mimics the format of `service call clipboard 4` output with a
|
|
ClipData Parcel containing text/plain MIME type and UTF-8 text.
|
|
"""
|
|
import struct
|
|
|
|
parts = []
|
|
# Status word (0 = success)
|
|
parts.append(struct.pack("<I", 0))
|
|
# Non-null marker
|
|
parts.append(struct.pack("<I", 1))
|
|
# ClipDescription: label length = 0 (no label)
|
|
parts.append(struct.pack("<I", 0))
|
|
# MIME type count = 1
|
|
parts.append(struct.pack("<I", 1))
|
|
# MIME type "text/plain" in UTF-16LE with length prefix
|
|
mime = "text/plain"
|
|
mime_utf16 = mime.encode("utf-16-le")
|
|
parts.append(struct.pack("<I", len(mime)))
|
|
parts.append(mime_utf16)
|
|
# Pad to 4-byte boundary
|
|
pad = (4 - len(mime_utf16) % 4) % 4
|
|
parts.append(b"\x00" * pad)
|
|
# Extras (none), timestamps, flags
|
|
parts.append(struct.pack("<I", 0xFFFFFFFF)) # no extras
|
|
parts.append(struct.pack("<I", 0xFFFFFFFF)) # no extras
|
|
parts.append(struct.pack("<I", 0)) # flags
|
|
# Item count = 1
|
|
parts.append(struct.pack("<I", 1))
|
|
# CharSequence type marker
|
|
parts.append(struct.pack("<I", 0))
|
|
# Text as length-prefixed UTF-8
|
|
text_bytes = text.encode("utf-8")
|
|
parts.append(struct.pack("<I", len(text_bytes)))
|
|
parts.append(text_bytes)
|
|
pad = (4 - len(text_bytes) % 4) % 4
|
|
parts.append(b"\x00" * pad)
|
|
|
|
data = b"".join(parts)
|
|
|
|
# Format as hex words like real parcel output
|
|
hex_lines = []
|
|
for i in range(0, len(data), 16):
|
|
chunk = data[i : i + 16]
|
|
words = []
|
|
for j in range(0, len(chunk), 4):
|
|
word = chunk[j : j + 4].ljust(4, b"\x00")
|
|
words.append(int.from_bytes(word, "little"))
|
|
hex_str = " ".join(f"{w:08x}" for w in words)
|
|
hex_lines.append(f" 0x{i:08x}: {hex_str} '...'")
|
|
|
|
return "Result: Parcel(\n" + "\n".join(hex_lines) + "\n)"
|
|
|
|
|
|
class TestParseClipboardParcel:
|
|
"""Direct tests for the static parcel parser."""
|
|
|
|
def test_valid_parcel(self):
|
|
raw = _build_parcel("test data here")
|
|
result = SettingsMixin._parse_clipboard_parcel(raw)
|
|
assert result == "test data here"
|
|
|
|
def test_nonzero_status(self):
|
|
raw = "Result: Parcel(00000001 '....')"
|
|
result = SettingsMixin._parse_clipboard_parcel(raw)
|
|
assert result is None
|
|
|
|
def test_no_hex_words(self):
|
|
result = SettingsMixin._parse_clipboard_parcel("no hex here")
|
|
assert result is None
|
|
|
|
def test_no_mime_marker(self):
|
|
raw = "Result: Parcel(00000000 00000001 '........')"
|
|
result = SettingsMixin._parse_clipboard_parcel(raw)
|
|
assert result is None
|
|
|
|
def test_long_text(self):
|
|
long_text = "The quick brown fox jumps over the lazy dog. " * 10
|
|
raw = _build_parcel(long_text)
|
|
result = SettingsMixin._parse_clipboard_parcel(raw)
|
|
assert result == long_text
|
|
|
|
|
|
class TestMediaControl:
|
|
async def test_play(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
result = await server.media_control("play")
|
|
assert result["success"] is True
|
|
assert result["action"] == "play"
|
|
assert result["keycode"] == "KEYCODE_MEDIA_PLAY"
|
|
|
|
async def test_all_actions(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
for action, keycode in _MEDIA_KEYCODES.items():
|
|
result = await server.media_control(action)
|
|
assert result["success"] is True
|
|
assert result["keycode"] == keycode
|
|
|
|
async def test_case_insensitive(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
result = await server.media_control("PLAY")
|
|
assert result["success"] is True
|
|
assert result["action"] == "play"
|
|
|
|
async def test_unknown_action(self, server):
|
|
result = await server.media_control("rewind")
|
|
assert result["success"] is False
|
|
assert "Unknown action" in result["error"]
|
|
assert "play" in result["error"] # Lists available actions
|
|
|
|
async def test_whitespace_stripped(self, server):
|
|
server.run_shell_args.return_value = ok()
|
|
result = await server.media_control(" pause ")
|
|
assert result["success"] is True
|
|
assert result["action"] == "pause"
|