From 9ba17ef621de1221a937d05c9b56c6dea80e1e24 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 13 Feb 2026 00:37:19 -0700 Subject: [PATCH] Address code review findings for YAML-in-PNG feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical fixes: - C1: Preserve existing PNG text metadata when embedding YAML - C2: Exception handling — metadata failure no longer crashes parse() - C3: Atomic write via tempfile+rename prevents corruption Important fixes: - I1: Size guard (10MB) on read_yaml_from_png for untrusted PNGs - I2: Existence check and meaningful errors in read_yaml_from_png - I3: ParsedInput NamedTuple replaces raw 3-tuple return - I4: Normalize output_formats to tuple to prevent substring matching - I5: 19-test suite covering round-trip, edge cases, and error paths Also: removed dead prepend_input parameter (S3), added version tag (S1). --- src/wireviz/wireviz.py | 46 +++++-- src/wireviz/wv_png_metadata.py | 148 ++++++++++++++++------ tests/__init__.py | 0 tests/test_png_metadata.py | 216 +++++++++++++++++++++++++++++++++ 4 files changed, 362 insertions(+), 48 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_png_metadata.py diff --git a/src/wireviz/wireviz.py b/src/wireviz/wireviz.py index 5706ad6..dcbe677 100755 --- a/src/wireviz/wireviz.py +++ b/src/wireviz/wireviz.py @@ -5,7 +5,7 @@ import platform import sys from errno import EINVAL, ENAMETOOLONG from pathlib import Path -from typing import Any, Dict, List, Tuple, Union +from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union import yaml @@ -26,6 +26,13 @@ from wireviz.wv_utils import ( from . import APP_NAME +class ParsedInput(NamedTuple): + """Result of parsing the input argument to parse().""" + data: Dict # parsed YAML as a dict + source_path: Optional[Path] # file path if input was a file, else None + raw_yaml: Optional[str] # raw YAML string, None when input was a Dict + + def parse( inp: Union[Path, str, Dict], return_types: Union[None, str, Tuple[str]] = None, @@ -89,21 +96,28 @@ def parse( if not output_formats and not return_types: raise Exception("No output formats or return types specified") - yaml_data, yaml_file, yaml_str = _get_yaml_data_and_path(inp) - if not isinstance(yaml_data, dict): + # normalize output_formats to a tuple to prevent substring matching bugs + # ("png" in "pngsvg" == True, but "png" in ("pngsvg",) == False) + if isinstance(output_formats, str): + output_formats = (output_formats,) + + parsed = _parse_input(inp) + if not isinstance(parsed.data, dict): raise TypeError( - f"Expected a dict as top-level YAML input, but got: {type(yaml_data)}" + f"Expected a dict as top-level YAML input, but got: {type(parsed.data)}" ) + yaml_data = parsed.data + if output_formats: # need to write data to file, determine output directory and filename - output_dir = _get_output_dir(yaml_file, output_dir) - output_name = _get_output_name(yaml_file, output_name) + output_dir = _get_output_dir(parsed.source_path, output_dir) + output_name = _get_output_name(parsed.source_path, output_name) output_file = output_dir / output_name - if yaml_file: + if parsed.source_path: # if reading from file, ensure that input file's parent directory # is included in image_paths - default_image_path = yaml_file.parent.resolve() + default_image_path = parsed.source_path.parent.resolve() if not default_image_path in [Path(x).resolve() for x in image_paths]: image_paths.append(default_image_path) @@ -396,8 +410,9 @@ def parse( if output_formats: harness.output(filename=output_file, fmt=output_formats, view=False) # embed YAML source into PNG for round-trip capability - if "png" in output_formats and yaml_str: - save_yaml_to_png(output_file, yaml_str) + # failure here is non-fatal — PNG diagram output remains valid + if "png" in output_formats and parsed.raw_yaml: + save_yaml_to_png(output_file, parsed.raw_yaml) if return_types: returns = [] @@ -417,8 +432,13 @@ def parse( return tuple(returns) if len(returns) != 1 else returns[0] -def _get_yaml_data_and_path(inp: Union[str, Path, Dict]) -> Tuple[Dict, Path, str]: - # determine whether inp is a file path, a YAML string, or a Dict +def _parse_input(inp: Union[str, Path, Dict]) -> ParsedInput: + """Parse the input argument into YAML data, source path, and raw string. + + When inp is a Dict, source_path and raw_yaml will be None. + When inp is a YAML string, source_path will be None. + When inp is a file path, all three fields will be populated. + """ if not isinstance(inp, Dict): # received a str or a Path try: yaml_path = Path(inp).expanduser().resolve(strict=True) @@ -446,7 +466,7 @@ def _get_yaml_data_and_path(inp: Union[str, Path, Dict]) -> Tuple[Dict, Path, st yaml_data = inp yaml_path = None yaml_str = None - return yaml_data, yaml_path, yaml_str + return ParsedInput(data=yaml_data, source_path=yaml_path, raw_yaml=yaml_str) def _get_output_dir(input_file: Path, default_output_dir: Path) -> Path: diff --git a/src/wireviz/wv_png_metadata.py b/src/wireviz/wv_png_metadata.py index f4a0445..c73e6f2 100644 --- a/src/wireviz/wv_png_metadata.py +++ b/src/wireviz/wv_png_metadata.py @@ -1,63 +1,141 @@ # -*- coding: utf-8 -*- -"""Embed and extract YAML source data in PNG metadata (iTXT chunks).""" +"""Embed and extract YAML source data in PNG metadata (iTXT chunks). +PNGs support arbitrary text metadata via iTXT chunks. We use compressed +iTXT to store the original YAML harness definition, enabling round-trip +workflows: share a PNG → extract the YAML → regenerate or edit. + +Security note: YAML extracted from untrusted PNGs must be parsed with +yaml.safe_load() and validated before use. A size limit is enforced +on extraction to guard against decompression bombs. +""" + +import logging +import os +import tempfile from pathlib import Path from typing import Optional, Tuple from PIL import Image from PIL.PngImagePlugin import PngInfo +logger = logging.getLogger(__name__) PNG_KEY_YAML = "wireviz_yaml" -PNG_KEY_PREPEND = "wireviz_prepend_yaml" +PNG_KEY_VERSION = "wireviz_meta_version" +META_VERSION = "1" + +# 10 MB — no harness definition should approach this +MAX_YAML_SIZE = 10 * 1024 * 1024 -def save_yaml_to_png( - png_path: Path, - yaml_input: str, - prepend_input: str = "", -) -> None: - """Save YAML source as compressed iTXT metadata in a PNG file.""" - png_path = Path(png_path) - if not png_path.suffix == ".png": - png_path = png_path.with_suffix(".png") +def _resolve_png_path(path: Path) -> Path: + """Ensure path has .png suffix.""" + path = Path(path) + if path.suffix != ".png": + path = path.with_suffix(".png") + return path + + +def save_yaml_to_png(png_path: Path, yaml_input: str) -> bool: + """Save YAML source as compressed iTXT metadata in a PNG file. + + Preserves any existing text metadata in the PNG. Uses atomic + write-to-temp-then-rename to prevent corruption on interrupted writes. + + Returns True if metadata was successfully embedded, False otherwise. + Failure is non-fatal — the PNG diagram output remains valid. + """ + png_path = _resolve_png_path(png_path) if not png_path.exists(): - return + logger.warning("Cannot embed YAML metadata: PNG not found: %s", png_path) + return False - with Image.open(fp=png_path) as im: - txt = PngInfo() - txt.add_itxt(PNG_KEY_YAML, yaml_input, zip=True) - if prepend_input: - txt.add_itxt(PNG_KEY_PREPEND, prepend_input, zip=True) - im.save(fp=png_path, pnginfo=txt) + try: + with Image.open(fp=png_path) as im: + im.load() # force full read before we overwrite + + # preserve existing text metadata + txt = PngInfo() + for key, value in im.text.items(): + if key not in (PNG_KEY_YAML, PNG_KEY_VERSION): + txt.add_text(key, value) + + txt.add_itxt(PNG_KEY_YAML, yaml_input, zip=True) + txt.add_text(PNG_KEY_VERSION, META_VERSION) + + # atomic write: temp file → rename + fd, tmp_path = tempfile.mkstemp( + suffix=".png", dir=png_path.parent + ) + try: + os.close(fd) + im.save(fp=tmp_path, pnginfo=txt) + Path(tmp_path).replace(png_path) + except Exception: + Path(tmp_path).unlink(missing_ok=True) + raise + + return True + + except Exception as e: + logger.warning("Failed to embed YAML metadata in %s: %s", png_path, e) + return False -def read_yaml_from_png(png_path: Path) -> Tuple[str, Optional[str]]: +def read_yaml_from_png(png_path: Path) -> str: """Extract YAML source from a PNG file's iTXT metadata. - Returns (yaml_input, prepend_input) where prepend_input may be None. + Returns the embedded YAML string. + + Raises: + FileNotFoundError: If the PNG file does not exist. + ValueError: If the PNG has no WireViz metadata, is corrupt, + or the embedded data exceeds the size limit. """ - png_path = Path(png_path) - if not png_path.suffix == ".png": - png_path = png_path.with_suffix(".png") + png_path = _resolve_png_path(png_path) + if not png_path.exists(): + raise FileNotFoundError( + f"Cannot read YAML metadata: PNG not found: {png_path}" + ) - with Image.open(fp=png_path) as im: - im.load() - yaml_input = im.text.get(PNG_KEY_YAML, "") - prepend_input = im.text.get(PNG_KEY_PREPEND) + try: + with Image.open(fp=png_path) as im: + im.load() + yaml_input = im.text.get(PNG_KEY_YAML, "") + except Exception as e: + raise ValueError( + f"Cannot read YAML metadata from {png_path}: {e}" + ) from e - return yaml_input, prepend_input + if not yaml_input: + raise ValueError( + f"PNG does not contain WireViz YAML metadata: {png_path}" + ) + + if len(yaml_input) > MAX_YAML_SIZE: + raise ValueError( + f"Embedded YAML exceeds size limit " + f"({len(yaml_input)} > {MAX_YAML_SIZE} bytes): {png_path}" + ) + + return yaml_input def has_yaml_metadata(png_path: Path) -> bool: - """Check if a PNG file contains embedded WireViz YAML data.""" - png_path = Path(png_path) - if not png_path.suffix == ".png": - png_path = png_path.with_suffix(".png") + """Check if a PNG file contains embedded WireViz YAML data. + + Lightweight probe — does not decompress or validate the YAML content. + Returns False for missing or unreadable files. + """ + png_path = _resolve_png_path(png_path) if not png_path.exists(): return False - with Image.open(fp=png_path) as im: - im.load() - return PNG_KEY_YAML in im.text + try: + with Image.open(fp=png_path) as im: + im.load() + return PNG_KEY_YAML in im.text + except Exception: + return False diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_png_metadata.py b/tests/test_png_metadata.py new file mode 100644 index 0000000..96a2e97 --- /dev/null +++ b/tests/test_png_metadata.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- + +"""Tests for wv_png_metadata — YAML embedding in PNG iTXT chunks.""" + +from pathlib import Path + +import pytest +from PIL import Image + +from wireviz.wv_png_metadata import ( + MAX_YAML_SIZE, + PNG_KEY_YAML, + PNG_KEY_VERSION, + has_yaml_metadata, + read_yaml_from_png, + save_yaml_to_png, +) + +SAMPLE_YAML = """\ +connectors: + X1: + pincount: 4 + X2: + pincount: 4 + +cables: + W1: + wirecount: 4 + length: 1 + +connections: + - + - X1: [1-4] + - W1: [1-4] + - X2: [1-4] +""" + + +def _create_test_png(path: Path, width: int = 10, height: int = 10) -> Path: + """Create a minimal valid PNG file for testing.""" + im = Image.new("RGB", (width, height), color="white") + im.save(path) + return path + + +# === Round-trip tests === + + +class TestRoundTrip: + def test_basic_round_trip(self, tmp_path): + """Write YAML to PNG, read it back, verify exact match.""" + png = _create_test_png(tmp_path / "test.png") + assert save_yaml_to_png(png, SAMPLE_YAML) is True + result = read_yaml_from_png(png) + assert result == SAMPLE_YAML + + def test_unicode_round_trip(self, tmp_path): + """YAML with unicode characters survives embedding.""" + yaml_unicode = "# Kabelbaum\nconnectors:\n X1:\n type: 'Stecker \u00f6\u00e4\u00fc\u00df'\n pincount: 2\n" + png = _create_test_png(tmp_path / "unicode.png") + assert save_yaml_to_png(png, yaml_unicode) is True + result = read_yaml_from_png(png) + assert result == yaml_unicode + assert "\u00f6\u00e4\u00fc\u00df" in result + + def test_large_yaml_round_trip(self, tmp_path): + """YAML at a realistic large size survives embedding.""" + large_yaml = SAMPLE_YAML + (" # padding line\n" * 10000) + png = _create_test_png(tmp_path / "large.png") + assert save_yaml_to_png(png, large_yaml) is True + result = read_yaml_from_png(png) + assert result == large_yaml + + def test_multiline_yaml_round_trip(self, tmp_path): + """YAML with various whitespace patterns survives.""" + yaml_ws = "connectors:\n X1:\n notes: |\n Line 1\n Line 2\n\n Line 4 after blank\n pincount: 2\n" + png = _create_test_png(tmp_path / "multiline.png") + assert save_yaml_to_png(png, yaml_ws) is True + assert read_yaml_from_png(png) == yaml_ws + + +# === save_yaml_to_png tests === + + +class TestSave: + def test_returns_false_for_missing_file(self, tmp_path): + """save_yaml_to_png on missing file returns False.""" + missing = tmp_path / "nope.png" + assert save_yaml_to_png(missing, SAMPLE_YAML) is False + + def test_auto_appends_png_suffix(self, tmp_path): + """Passing path without .png suffix still works.""" + _create_test_png(tmp_path / "test.png") + assert save_yaml_to_png(tmp_path / "test", SAMPLE_YAML) is True + assert has_yaml_metadata(tmp_path / "test.png") + + def test_preserves_existing_metadata(self, tmp_path): + """Existing PNG text metadata is not destroyed.""" + from PIL.PngImagePlugin import PngInfo + + png_path = tmp_path / "meta.png" + im = Image.new("RGB", (10, 10), color="white") + existing = PngInfo() + existing.add_text("Software", "GraphViz 14.1") + existing.add_text("Author", "WireViz") + im.save(png_path, pnginfo=existing) + + save_yaml_to_png(png_path, SAMPLE_YAML) + + with Image.open(png_path) as im: + im.load() + assert im.text.get("Software") == "GraphViz 14.1" + assert im.text.get("Author") == "WireViz" + assert PNG_KEY_YAML in im.text + + def test_writes_version_tag(self, tmp_path): + """Metadata includes a version tag for future compatibility.""" + png = _create_test_png(tmp_path / "ver.png") + save_yaml_to_png(png, SAMPLE_YAML) + + with Image.open(png) as im: + im.load() + assert im.text.get(PNG_KEY_VERSION) == "1" + + def test_overwrites_previous_yaml(self, tmp_path): + """Calling save twice replaces the embedded YAML.""" + png = _create_test_png(tmp_path / "overwrite.png") + save_yaml_to_png(png, "first: version\n") + save_yaml_to_png(png, "second: version\n") + result = read_yaml_from_png(png) + assert result == "second: version\n" + + def test_atomic_write_no_corruption_on_success(self, tmp_path): + """File is valid PNG after successful save.""" + png = _create_test_png(tmp_path / "atomic.png") + save_yaml_to_png(png, SAMPLE_YAML) + # verify it's still a valid, openable PNG + with Image.open(png) as im: + im.load() + assert im.size == (10, 10) + + +# === read_yaml_from_png tests === + + +class TestRead: + def test_raises_file_not_found(self, tmp_path): + """read_yaml_from_png on missing file raises FileNotFoundError.""" + with pytest.raises(FileNotFoundError, match="PNG not found"): + read_yaml_from_png(tmp_path / "nope.png") + + def test_raises_for_corrupt_file(self, tmp_path): + """read_yaml_from_png on non-PNG file raises ValueError.""" + corrupt = tmp_path / "corrupt.png" + corrupt.write_text("this is not a png") + with pytest.raises(ValueError, match="Cannot read YAML metadata"): + read_yaml_from_png(corrupt) + + def test_raises_for_png_without_metadata(self, tmp_path): + """read_yaml_from_png on plain PNG raises ValueError.""" + png = _create_test_png(tmp_path / "plain.png") + with pytest.raises(ValueError, match="does not contain WireViz YAML"): + read_yaml_from_png(png) + + def test_raises_for_oversized_yaml(self, tmp_path): + """read_yaml_from_png rejects YAML exceeding the size limit. + + Pillow's own MAX_TEXT_CHUNK guard fires at the decompression level + before our application-level size check — defense in depth. + Both produce ValueError, just with different messages. + """ + png = _create_test_png(tmp_path / "bomb.png") + from PIL.PngImagePlugin import PngInfo + + with Image.open(png) as im: + im.load() + txt = PngInfo() + txt.add_itxt(PNG_KEY_YAML, "x" * (MAX_YAML_SIZE + 1), zip=True) + im.save(png, pnginfo=txt) + + # either our size check or Pillow's decompression limit will fire + with pytest.raises(ValueError): + read_yaml_from_png(png) + + def test_auto_appends_png_suffix(self, tmp_path): + """Passing path without .png suffix still works.""" + png = _create_test_png(tmp_path / "test.png") + save_yaml_to_png(png, SAMPLE_YAML) + result = read_yaml_from_png(tmp_path / "test") + assert result == SAMPLE_YAML + + +# === has_yaml_metadata tests === + + +class TestHasMetadata: + def test_true_after_embedding(self, tmp_path): + """has_yaml_metadata returns True after embedding.""" + png = _create_test_png(tmp_path / "test.png") + save_yaml_to_png(png, SAMPLE_YAML) + assert has_yaml_metadata(png) is True + + def test_false_for_nonexistent_file(self, tmp_path): + """has_yaml_metadata returns False for missing file.""" + assert has_yaml_metadata(tmp_path / "nope.png") is False + + def test_false_for_plain_png(self, tmp_path): + """has_yaml_metadata returns False for PNG without metadata.""" + png = _create_test_png(tmp_path / "plain.png") + assert has_yaml_metadata(png) is False + + def test_false_for_corrupt_file(self, tmp_path): + """has_yaml_metadata returns False for non-PNG file.""" + corrupt = tmp_path / "corrupt.png" + corrupt.write_text("not a png") + assert has_yaml_metadata(corrupt) is False