#!/usr/bin/env python # -*- coding: utf-8 -*- from wireviz.DataClasses import Connector, Cable from graphviz import Graph from wireviz import wv_colors, wv_helper, __version__, APP_NAME, APP_URL from wireviz.wv_colors import get_color_hex from wireviz.wv_helper import awg_equiv, mm2_equiv, tuplelist2tsv, \ nested_html_table, flatten2d, index_if_list, html_line_breaks, \ graphviz_line_breaks, remove_line_breaks, open_file_read, open_file_write, \ html_colorbar, html_image, html_caption, manufacturer_info_field, \ component_table_entry, \ calculate_qty_multiplier_connector, calculate_qty_multiplier_cable from collections import Counter from typing import List from pathlib import Path import re class Harness: def __init__(self): self.color_mode = 'SHORT' self.mini_bom_mode = True self.connectors = {} self.cables = {} self._bom = [] # Internal Cache for generated bom self.additional_bom_items = [] def add_connector(self, name: str, *args, **kwargs) -> None: self.connectors[name] = Connector(name, *args, **kwargs) def add_cable(self, name: str, *args, **kwargs) -> None: self.cables[name] = Cable(name, *args, **kwargs) def add_bom_item(self, item: dict) -> None: self.additional_bom_items.append(item) def connect(self, from_name: str, from_pin: (int, str), via_name: str, via_pin: (int, str), to_name: str, to_pin: (int, str)) -> None: for (name, pin) in zip([from_name, to_name], [from_pin, to_pin]): # check from and to connectors if name is not None and name in self.connectors: connector = self.connectors[name] if pin in connector.pins and pin in connector.pinlabels: if connector.pins.index(pin) == connector.pinlabels.index(pin): # TODO: Maybe issue a warning? It's not worthy of an exception if it's unambiguous, but maybe risky? pass else: raise Exception(f'{name}:{pin} is defined both in pinlabels and pins, for different pins.') if pin in connector.pinlabels: if connector.pinlabels.count(pin) > 1: raise Exception(f'{name}:{pin} is defined more than once.') else: index = connector.pinlabels.index(pin) pin = connector.pins[index] # map pin name to pin number if name == from_name: from_pin = pin if name == to_name: to_pin = pin if not pin in connector.pins: raise Exception(f'{name}:{pin} not found.') self.cables[via_name].connect(from_name, from_pin, via_pin, to_name, to_pin) if from_name in self.connectors: self.connectors[from_name].activate_pin(from_pin) if to_name in self.connectors: self.connectors[to_name].activate_pin(to_pin) def create_graph(self) -> Graph: dot = Graph() dot.body.append(f'// Graph generated by {APP_NAME} {__version__}') dot.body.append(f'// {APP_URL}') font = 'arial' dot.attr('graph', rankdir='LR', ranksep='2', bgcolor='white', nodesep='0.33', fontname=font) dot.attr('node', shape='record', style='filled', fillcolor='white', fontname=font) dot.attr('edge', style='bold', fontname=font) # prepare ports on connectors depending on which side they will connect for _, cable in self.cables.items(): for connection_color in cable.connections: if connection_color.from_port is not None: # connect to left self.connectors[connection_color.from_name].ports_right = True if connection_color.to_port is not None: # connect to right self.connectors[connection_color.to_name].ports_left = True for connector in self.connectors.values(): html = [] rows = [[connector.name if connector.show_name else None], [f'P/N: {connector.pn}' if connector.pn else None, html_line_breaks(manufacturer_info_field(connector.manufacturer, connector.mpn))], [html_line_breaks(connector.type), html_line_breaks(connector.subtype), f'{connector.pincount}-pin' if connector.show_pincount else None, connector.color, html_colorbar(connector.color)], '' if connector.style != 'simple' else None, [html_image(connector.image)], [html_caption(connector.image)]] if connector.additional_components is not None: rows.append(["Additional components"]) for extra in connector.additional_components: qty = extra.qty qty *= calculate_qty_multiplier_connector(extra.qty_multiplier, connector) if(self.mini_bom_mode): id = self.get_bom_index(extra.long_name(), extra.unit, extra.manufacturer, extra.mpn, extra.pn) rows.append(html_line_breaks(component_table_entry(f'{id} ({extra.type.capitalize()})', qty, extra.unit))) else: rows.append(html_line_breaks(extra.long_name(), qty, extra.get.unit, extra.pn, extra.manufacturer, extra.mpn)) rows.append([html_line_breaks(connector.notes)]) html.extend(nested_html_table(rows)) if connector.style != 'simple': pinhtml = [] pinhtml.append('') for pin, pinlabel in zip(connector.pins, connector.pinlabels): if connector.hide_disconnected_pins and not connector.visible_pins.get(pin, False): continue pinhtml.append(' ') if connector.ports_left: pinhtml.append(f' ') if pinlabel: pinhtml.append(f' ') if connector.ports_right: pinhtml.append(f' ') pinhtml.append(' ') pinhtml.append('
{pin}{pinlabel}{pin}
') html = [row.replace('', '\n'.join(pinhtml)) for row in html] html = '\n'.join(html) dot.node(connector.name, label=f'<\n{html}\n>', shape='none', margin='0', style='filled', fillcolor='white') if len(connector.loops) > 0: dot.attr('edge', color='#000000:#ffffff:#000000') if connector.ports_left: loop_side = 'l' loop_dir = 'w' elif connector.ports_right: loop_side = 'r' loop_dir = 'e' else: raise Exception('No side for loops') for loop in connector.loops: dot.edge(f'{connector.name}:p{loop[0]}{loop_side}:{loop_dir}', f'{connector.name}:p{loop[1]}{loop_side}:{loop_dir}') # determine if there are double- or triple-colored wires in the harness; # if so, pad single-color wires to make all wires of equal thickness pad = any(len(colorstr) > 2 for cable in self.cables.values() for colorstr in cable.colors) for cable in self.cables.values(): html = [] awg_fmt = '' if cable.show_equiv: # Only convert units we actually know about, i.e. currently # mm2 and awg --- other units _are_ technically allowed, # and passed through as-is. if cable.gauge_unit =='mm\u00B2': awg_fmt = f' ({awg_equiv(cable.gauge)} AWG)' elif cable.gauge_unit.upper() == 'AWG': awg_fmt = f' ({mm2_equiv(cable.gauge)} mm\u00B2)' rows = [[cable.name if cable.show_name else None], [f'P/N: {cable.pn}' if (cable.pn and not isinstance(cable.pn, list)) else None, html_line_breaks(manufacturer_info_field( cable.manufacturer if not isinstance(cable.manufacturer, list) else None, cable.mpn if not isinstance(cable.mpn, list) else None))], [html_line_breaks(cable.type), f'{cable.wirecount}x' if cable.show_wirecount else None, f'{cable.gauge} {cable.gauge_unit}{awg_fmt}' if cable.gauge else None, '+ S' if cable.shield else None, f'{cable.length} m' if cable.length > 0 else None, cable.color, html_colorbar(cable.color)], '', [html_image(cable.image)], [html_caption(cable.image)]] if len(cable.additional_components) > 0: rows.append(["Additional components"]) for extra in cable.additional_components: qty = extra.qty qty *= calculate_qty_multiplier_cable(extra.qty_multiplier, cable) if(self.mini_bom_mode): id = self.get_bom_index(extra.long_name(), extra.unit, extra.manufacturer, extra.mpn, extra.pn) rows.append(html_line_breaks(component_table_entry(f'{id} ({extra.type.capitalize()})', qty, extra.unit))) else: rows.append(html_line_breaks(component_table_entry(extra.long_name(), qty, extra.unit, extra.pn, extra.manufacturer, extra.mpn))) rows.append([html_line_breaks(cable.notes)]) html.extend(nested_html_table(rows)) wirehtml = [] wirehtml.append('') # conductor table wirehtml.append(' ') for i, connection_color in enumerate(cable.colors, 1): wirehtml.append(' ') wirehtml.append(f' ') wirehtml.append(f' ') wirehtml.append(f' ') wirehtml.append(' ') bgcolors = ['#000000'] + get_color_hex(connection_color, pad=pad) + ['#000000'] wirehtml.append(f' ') wirehtml.append(f' ') wirehtml.append(' ') if(cable.category == 'bundle'): # for bundles individual wires can have part information # create a list of wire parameters wireidentification = [] if isinstance(cable.pn, list): wireidentification.append(f'P/N: {cable.pn[i - 1]}') manufacturer_info = manufacturer_info_field( cable.manufacturer[i - 1] if isinstance(cable.manufacturer, list) else None, cable.mpn[i - 1] if isinstance(cable.mpn, list) else None) if manufacturer_info: wireidentification.append(html_line_breaks(manufacturer_info)) # print parameters into a table row under the wire if(len(wireidentification) > 0): wirehtml.append(' ') if cable.shield: wirehtml.append(' ') # spacer wirehtml.append(' ') wirehtml.append(' ') wirehtml.append(' ') wirehtml.append(' ') wirehtml.append(' ') if isinstance(cable.shield, str): # shield is shown with specified color and black borders shield_color_hex = wv_colors.get_color_hex(cable.shield)[0] attributes = f'height="6" bgcolor="{shield_color_hex}" border="2" sides="tb"' else: # shield is shown as a thin black wire attributes = f'height="2" bgcolor="#000000" border="0"' wirehtml.append(f' ') wirehtml.append(' ') wirehtml.append('
 
{wv_colors.translate_color(connection_color, self.color_mode)}
') wirehtml.append(' ') for j, bgcolor in enumerate(bgcolors[::-1]): # Reverse to match the curved wires when more than 2 colors wirehtml.append(f' ') wirehtml.append('
') wirehtml.append('
') wirehtml.append(' ') for attrib in wireidentification: wirehtml.append(f' ') wirehtml.append('
{attrib}
') wirehtml.append('
 
Shield
 
') html = [row.replace('', '\n'.join(wirehtml)) for row in html] # connections for connection_color in cable.connections: if isinstance(connection_color.via_port, int): # check if it's an actual wire and not a shield dot.attr('edge', color=':'.join(['#000000'] + wv_colors.get_color_hex(cable.colors[connection_color.via_port - 1], pad=pad) + ['#000000'])) else: # it's a shield connection # shield is shown with specified color and black borders, or as a thin black wire otherwise dot.attr('edge', color=':'.join(['#000000', shield_color_hex, '#000000']) if isinstance(cable.shield, str) else '#000000') if connection_color.from_port is not None: # connect to left from_port = f':p{connection_color.from_port}r' if self.connectors[connection_color.from_name].style != 'simple' else '' code_left_1 = f'{connection_color.from_name}{from_port}:e' code_left_2 = f'{cable.name}:w{connection_color.via_port}:w' dot.edge(code_left_1, code_left_2) from_string = f'{connection_color.from_name}:{connection_color.from_port}' if self.connectors[connection_color.from_name].show_name else '' html = [row.replace(f'', from_string) for row in html] if connection_color.to_port is not None: # connect to right code_right_1 = f'{cable.name}:w{connection_color.via_port}:e' to_port = f':p{connection_color.to_port}l' if self.connectors[connection_color.to_name].style != 'simple' else '' code_right_2 = f'{connection_color.to_name}{to_port}:w' dot.edge(code_right_1, code_right_2) to_string = f'{connection_color.to_name}:{connection_color.to_port}' if self.connectors[connection_color.to_name].show_name else '' html = [row.replace(f'', to_string) for row in html] html = '\n'.join(html) dot.node(cable.name, label=f'<\n{html}\n>', shape='box', style='filled,dashed' if cable.category == 'bundle' else '', margin='0', fillcolor='white') return dot @property def png(self): from io import BytesIO graph = self.create_graph() data = BytesIO() data.write(graph.pipe(format='png')) data.seek(0) return data.read() @property def svg(self): from io import BytesIO graph = self.create_graph() data = BytesIO() data.write(graph.pipe(format='svg')) data.seek(0) return data.read() def output(self, filename: (str, Path), view: bool = False, cleanup: bool = True, fmt: tuple = ('pdf', )) -> None: # graphical output graph = self.create_graph() for f in fmt: graph.format = f graph.render(filename=filename, view=view, cleanup=cleanup) graph.save(filename=f'{filename}.gv') # bom output bom_list = self.bom_list() with open_file_write(f'{filename}.bom.tsv') as file: file.write(tuplelist2tsv(bom_list)) # HTML output with open_file_write(f'{filename}.html') as file: file.write('\n') file.write('\n') file.write(' \n') file.write(f' \n') file.write(f' {APP_NAME} Diagram and BOM\n') file.write('\n') file.write('

Diagram

') with open_file_read(f'{filename}.svg') as svg: file.write(re.sub( '^<[?]xml [^?>]*[?]>[^<]*]*>', '', svg.read(1024), 1)) for svgdata in svg: file.write(svgdata) file.write('

Bill of Materials

') listy = flatten2d(bom_list) file.write('') file.write('') for item in listy[0]: file.write(f'') file.write('') for row in listy[1:]: file.write('') for i, item in enumerate(row): item_str = item.replace('\u00b2', '²') align = 'text-align:right; ' if listy[0][i] == 'Qty' else '' file.write(f'') file.write('') file.write('
{item}
{item_str}
') file.write('') def bom(self): # if the bom has previously been generated then return the generated bom if len(self._bom) > 0: return self._bom bom_items = [] # connectors for connector in self.connectors.values(): if not connector.ignore_in_bom: conn_type = f', {remove_line_breaks(connector.type)}' if connector.type else '' conn_subtype = f', {remove_line_breaks(connector.subtype)}' if connector.subtype else '' conn_pincount = f', {connector.pincount} pins' if connector.style != 'simple' else '' conn_color = f', {connector.color}' if connector.color else '' name = f'Connector{conn_type}{conn_subtype}{conn_pincount}{conn_color}' item = {'item': name, 'qty': 1, 'unit': '', 'designators': connector.name if connector.show_name else None, 'manufacturer': remove_line_breaks(connector.manufacturer), 'mpn': remove_line_breaks(connector.mpn), 'pn': connector.pn} bom_items.append(item) for part in connector.additional_components: qty = part.qty qty *= calculate_qty_multiplier_connector(part.qty_multiplier, connector) bom_items.append( { 'item': part.long_name(), 'qty': qty, 'unit': part.unit, 'manufacturer': part.manufacturer, 'mpn': part.mpn, 'pn': part.pn, 'designators': connector.name if connector.show_name else None } ) # cables # TODO: If category can have other non-empty values than 'bundle', maybe it should be part of item name? for cable in self.cables.values(): if not cable.ignore_in_bom: # create name beilds used by both cables and bundles cable_type = f', {remove_line_breaks(cable.type)}' if cable.type else '' gauge_name = f' x {cable.gauge} {cable.gauge_unit}' if cable.gauge else ' wires' if cable.category != 'bundle': # process cable as a single entity shield_name = ' shielded' if cable.shield else '' name = f'Cable{cable_type}, {cable.wirecount}{gauge_name}{shield_name}' item = {'item': name, 'qty': cable.length, 'unit': 'm', 'designators': cable.name, 'manufacturer': remove_line_breaks(cable.manufacturer), 'mpn': remove_line_breaks(cable.mpn), 'pn': cable.pn} bom_items.append(item) else: # add each wire from the bundle to the bom for index, color in enumerate(cable.colors, 0): gauge_color = f', {color}' if color else '' name = f'Wire{cable_type}{gauge_name}{gauge_color}' item = {'item': name, 'qty': cable.length, 'unit': 'm', 'designators': cable.name, 'manufacturer': remove_line_breaks(index_if_list(cable.manufacturer, index)), 'mpn': remove_line_breaks(index_if_list(cable.mpn, index)), 'pn': index_if_list(cable.pn, index)} bom_items.append(item) for part in cable.additional_components: qty = part.qty qty *= calculate_qty_multiplier_cable(part.qty_multiplier, cable) bom_items.append( { 'item': part.long_name(), 'qty': qty, 'unit': part.unit, 'manufacturer': part.manufacturer, 'mpn': part.mpn, 'pn': part.pn, 'designators': cable.name } ) for item in self.additional_bom_items: name = item['description'] if item.get('description', None) else '' item = {'item': name, 'qty': item.get('qty'), 'unit': item.get('unit'), 'designators': item.get('designators'), 'manufacturer': item.get('manufacturer'), 'mpn': item.get('mpn'), 'pn': item.get('pn')} bom_items.append(item) # deduplicate bom bom_types_group = lambda bt: (bt['item'], bt['unit'], bt['manufacturer'], bt['mpn'], bt['pn']) for group in Counter([bom_types_group(v) for v in bom_items]): items = [v for v in bom_items if bom_types_group(v) == group] shared = items[0] designators = [] for item in items: if item.get('designators'): if isinstance(item['designators'], List): designators.extend(item['designators']) else: designators.append(item['designators']) designators = list(dict.fromkeys(designators)) # remove duplicates designators.sort() total_qty = sum(i['qty'] for i in items) item = {'item': shared['item'], 'qty': round(total_qty, 3), 'unit': shared['unit'], 'designators': designators, 'manufacturer': shared['manufacturer'], 'mpn': shared['mpn'], 'pn': shared['pn']} self._bom.append(item) self._bom = sorted(self._bom, key=lambda k: k['item']) # sort list of dicts by their values (https://stackoverflow.com/a/73050) # add index index = 1 for item in self._bom: item["id"] = index index += 1 return self._bom def get_bom_index(self, item, unit, manufacturer, mpn, pn): for bom_item in self.bom(): if((bom_item['item'], bom_item['unit'], bom_item['manufacturer'], bom_item['mpn'], bom_item['pn']) == (item, unit, manufacturer, mpn, pn)): return bom_item['id'] return None def bom_list(self): bom = self.bom() keys = ['id', 'item', 'qty', 'unit', 'designators'] # these BOM columns will always be included for fieldname in ['pn', 'manufacturer', 'mpn']: # these optional BOM columns will only be included if at least one BOM item actually uses them if any(entry.get(fieldname) for entry in bom): keys.append(fieldname) bom_list = [] # list of staic bom header names, headers not specified here are generated by capitilising the internal name bom_headings = { "pn": "P/N", "mpn": "MPN" } bom_list.append([(bom_headings[k] if k in bom_headings else k.capitalize()) for k in keys]) # create header row with keys for item in bom: item_list = [item.get(key, '') for key in keys] # fill missing values with blanks item_list = [', '.join(subitem) if isinstance(subitem, List) else subitem for subitem in item_list] # convert any lists into comma separated strings item_list = ['' if subitem is None else subitem for subitem in item_list] # if a field is missing for some (but not all) BOM items bom_list.append(item_list) return bom_list