import re from itertools import count from gnuradio.grc.core.blocks.block import Block from gnuradio.grc.core.Connection import Connection from gnuradio.grc.core.params.param import Param from gnuradio.grc.core.ports.port import Port from pydantic import BaseModel from gnuradio_mcp.models import ( SINK, SOURCE, BlockModel, ConnectionModel, DirectionType, ErrorModel, ParamModel, PortModel, ) def get_unique_id(flowgraph_blocks, base_id=""): block_ids = set(b.name for b in flowgraph_blocks) for index in count(): block_id = "{}_{}".format(base_id, index) if block_id not in block_ids: break return block_id def format_error_message(elem, msg) -> ErrorModel: msg = re.sub("[^A-Za-z0-9]+", " ", msg).strip() model: BaseModel match (elem): case Connection(): model = ConnectionModel.from_connection(elem) case Param(): model = ParamModel.from_param(elem) case Port(): model = PortModel.from_port(elem) case Block(): model = BlockModel.from_block(elem) case _: raise ValueError(f"Unsupported element type: {type(elem)}") return ErrorModel( type=type(model).__name__, key=model, # type: ignore message=msg, ) def get_port_by_key_in_port_list(port_list: list[Port], key: str) -> Block: for port in port_list: if port.key == key: return port raise ValueError(f"Port not found: {key}") def get_port_by_key( flowgraph, block_name: str, port_name: str, direction: DirectionType ) -> Port: block = flowgraph.get_block(block_name) if direction == SOURCE: return get_port_by_key_in_port_list(block.sources, port_name) elif direction == SINK: return get_port_by_key_in_port_list(block.sinks, port_name) else: raise ValueError(f"Invalid port direction: {direction}") def get_port_from_port_model(flowgraph, port_model: PortModel) -> Port: block_from_port_model = flowgraph.get_block(port_model.parent) return get_port_by_key( flowgraph, block_from_port_model.name, port_model.key, port_model.direction )