The lru_cache on get_block() caused three cache coherence bugs: 1. Removed blocks remained accessible — set_block_params would modify a detached block object that no longer exists in the flowgraph, so save_flowgraph would serialize stale state. 2. Re-created blocks with the same auto-generated name would return the cached (removed) block instead of the new one. 3. Renamed blocks (via set_block_params id=...) left phantom cache entries under the old name. Fix: always look up blocks from the live flowgraph. Block lookup is a linear scan of typically <20 blocks — no cache needed. Raises KeyError instead of StopIteration for missing blocks.
194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Generator
|
|
|
|
import pytest
|
|
|
|
from gnuradio_mcp.middlewares.block import BlockMiddleware
|
|
from gnuradio_mcp.middlewares.flowgraph import FlowGraphMiddleware
|
|
from gnuradio_mcp.middlewares.platform import PlatformMiddleware
|
|
from gnuradio_mcp.models import BlockModel, ConnectionModel, ErrorModel
|
|
|
|
|
|
@pytest.fixture
|
|
def flowgraph_middleware(platform_middleware: PlatformMiddleware):
|
|
return platform_middleware.make_flowgraph()
|
|
|
|
|
|
@pytest.fixture
|
|
def initial_blocks(flowgraph_middleware: FlowGraphMiddleware):
|
|
return [
|
|
BlockModel.from_block(block) for block in flowgraph_middleware._flowgraph.blocks
|
|
]
|
|
|
|
|
|
def test_flowgraph_block_addition_and_removal(
|
|
flowgraph_middleware: FlowGraphMiddleware,
|
|
initial_blocks: list[BlockModel],
|
|
block_key: str,
|
|
):
|
|
explicit_name = "my_custom_block_name"
|
|
|
|
flowgraph_middleware.add_block(block_key, explicit_name)
|
|
|
|
blocks = flowgraph_middleware.blocks
|
|
assert all(b in blocks for b in initial_blocks)
|
|
assert any(b.name == explicit_name for b in blocks)
|
|
|
|
flowgraph_middleware.remove_block(explicit_name)
|
|
|
|
blocks = flowgraph_middleware.blocks
|
|
assert all(b in initial_blocks for b in blocks)
|
|
|
|
|
|
def test_flowgraph_initial_state(
|
|
flowgraph_middleware: FlowGraphMiddleware,
|
|
initial_blocks: list[BlockModel],
|
|
):
|
|
assert flowgraph_middleware.blocks == initial_blocks
|
|
|
|
|
|
def test_block_naming(flowgraph_middleware: FlowGraphMiddleware, block_key: str):
|
|
explicit_name = "my_custom_block_name"
|
|
|
|
block = flowgraph_middleware.add_block(block_key, explicit_name)
|
|
|
|
assert block.name == explicit_name
|
|
|
|
|
|
def test_block_unique_names_for_same_type(
|
|
flowgraph_middleware: FlowGraphMiddleware, block_key: str
|
|
):
|
|
first_block = flowgraph_middleware.add_block(block_key)
|
|
second_block = flowgraph_middleware.add_block(block_key)
|
|
|
|
assert first_block.name != second_block.name
|
|
|
|
|
|
def test_block_default_name(flowgraph_middleware: FlowGraphMiddleware, block_key: str):
|
|
block_model = flowgraph_middleware.add_block(block_key)
|
|
assert block_key in block_model.name
|
|
|
|
|
|
def test_remove_existing_block(flowgraph_middleware: FlowGraphMiddleware):
|
|
DEFAULT_VARIABLE_BLOCK_NAME = "samp_rate"
|
|
flowgraph_middleware.remove_block(DEFAULT_VARIABLE_BLOCK_NAME)
|
|
assert not any(
|
|
block.name == DEFAULT_VARIABLE_BLOCK_NAME
|
|
for block in flowgraph_middleware.blocks
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"block_key, sinks_number, sources_number",
|
|
[("blocks_add_xx", 2, 1), ("blocks_copy", 1, 1), ("blocks_selector", 2, 2)],
|
|
)
|
|
def test_block_connections(
|
|
flowgraph_middleware: FlowGraphMiddleware,
|
|
block_key: str,
|
|
sources_number: int,
|
|
sinks_number: int,
|
|
):
|
|
source_block, dest_block = create_and_connect_blocks(
|
|
flowgraph_middleware, block_key, block_key
|
|
)
|
|
|
|
for connection in util_iter_possible_connections(source_block, dest_block):
|
|
connections = flowgraph_middleware.get_connections()
|
|
assert any(
|
|
c.source.key == connection.source.key and c.sink.key == connection.sink.key
|
|
for c in connections
|
|
)
|
|
|
|
assert len(flowgraph_middleware.get_connections()) == sources_number * sinks_number
|
|
|
|
|
|
def test_block_disconnection(flowgraph_middleware: FlowGraphMiddleware, block_key: str):
|
|
source_block, dest_block = create_and_connect_blocks(
|
|
flowgraph_middleware, block_key, block_key
|
|
)
|
|
|
|
for connection in util_iter_possible_connections(source_block, dest_block):
|
|
flowgraph_middleware.disconnect_blocks(connection.source, connection.sink)
|
|
connections = flowgraph_middleware.get_connections()
|
|
assert not any(
|
|
c.source.key == connection.source.key and c.sink.key == connection.sink.key
|
|
for c in connections
|
|
)
|
|
|
|
assert len(flowgraph_middleware.get_connections()) == 0
|
|
|
|
|
|
def test_get_block_after_removal_raises(flowgraph_middleware: FlowGraphMiddleware):
|
|
"""Removed blocks must not be accessible — prevents stale references."""
|
|
model = flowgraph_middleware.add_block("analog_sig_source_x")
|
|
flowgraph_middleware.remove_block(model.name)
|
|
with pytest.raises(KeyError):
|
|
flowgraph_middleware.get_block(model.name)
|
|
|
|
|
|
def test_recreated_block_gets_fresh_state(flowgraph_middleware: FlowGraphMiddleware):
|
|
"""Re-creating a block after removal must return the new instance."""
|
|
model = flowgraph_middleware.add_block("analog_sig_source_x")
|
|
block_mw = flowgraph_middleware.get_block(model.name)
|
|
block_mw.set_params({"freq": "99999"})
|
|
|
|
flowgraph_middleware.remove_block(model.name)
|
|
model2 = flowgraph_middleware.add_block("analog_sig_source_x", model.name)
|
|
|
|
block_mw2 = flowgraph_middleware.get_block(model2.name)
|
|
freq_param = next(p for p in block_mw2.params if p.key == "freq")
|
|
# New block should have default value, not the one we set before removal
|
|
assert freq_param.value != "99999"
|
|
|
|
|
|
def test_renamed_block_accessible_by_new_name(
|
|
flowgraph_middleware: FlowGraphMiddleware,
|
|
):
|
|
"""After renaming via set_params(id=...), the new name must resolve."""
|
|
model = flowgraph_middleware.add_block("variable")
|
|
old_name = model.name
|
|
block_mw = flowgraph_middleware.get_block(old_name)
|
|
block_mw.set_params({"id": "my_var", "value": "42"})
|
|
|
|
# New name works
|
|
new_block = flowgraph_middleware.get_block("my_var")
|
|
val = next(p for p in new_block.params if p.key == "value")
|
|
assert val.value == "42"
|
|
|
|
# Old name is gone
|
|
with pytest.raises(KeyError):
|
|
flowgraph_middleware.get_block(old_name)
|
|
|
|
|
|
def test_default_flowgraph_errors(flowgraph_middleware: FlowGraphMiddleware):
|
|
for error in flowgraph_middleware.get_all_errors():
|
|
assert isinstance(error, ErrorModel)
|
|
assert len(flowgraph_middleware.get_all_errors()) == 0
|
|
|
|
|
|
def create_and_connect_blocks(
|
|
flowgraph_middleware: FlowGraphMiddleware,
|
|
source_block_key: str,
|
|
dest_block_key: str,
|
|
):
|
|
source_block_model = flowgraph_middleware.add_block(source_block_key)
|
|
dest_block_model = flowgraph_middleware.add_block(dest_block_key)
|
|
|
|
source_block = flowgraph_middleware.get_block(source_block_model.name)
|
|
dest_block = flowgraph_middleware.get_block(dest_block_model.name)
|
|
|
|
for connection in util_iter_possible_connections(source_block, dest_block):
|
|
flowgraph_middleware.connect_blocks(connection.source, connection.sink)
|
|
|
|
return source_block, dest_block
|
|
|
|
|
|
def util_iter_possible_connections(
|
|
source_block: BlockMiddleware,
|
|
dest_block: BlockMiddleware,
|
|
) -> Generator[ConnectionModel]:
|
|
for sink in source_block.sinks:
|
|
for source in dest_block.sources:
|
|
yield ConnectionModel(source=source, sink=sink)
|