Address code review findings for YAML-in-PNG feature
Some checks are pending
Create Examples / build (ubuntu-22.04, 3.7) (push) Waiting to run
Create Examples / build (ubuntu-22.04, 3.8) (push) Waiting to run
Create Examples / build (ubuntu-latest, 3.10) (push) Waiting to run
Create Examples / build (ubuntu-latest, 3.11) (push) Waiting to run
Create Examples / build (ubuntu-latest, 3.12) (push) Waiting to run
Create Examples / build (ubuntu-latest, 3.9) (push) Waiting to run

Critical fixes:
- C1: Preserve existing PNG text metadata when embedding YAML
- C2: Exception handling — metadata failure no longer crashes parse()
- C3: Atomic write via tempfile+rename prevents corruption

Important fixes:
- I1: Size guard (10MB) on read_yaml_from_png for untrusted PNGs
- I2: Existence check and meaningful errors in read_yaml_from_png
- I3: ParsedInput NamedTuple replaces raw 3-tuple return
- I4: Normalize output_formats to tuple to prevent substring matching
- I5: 19-test suite covering round-trip, edge cases, and error paths

Also: removed dead prepend_input parameter (S3), added version tag (S1).
This commit is contained in:
Ryan Malloy 2026-02-13 00:37:19 -07:00
parent 5f54ab1f0d
commit 9ba17ef621
4 changed files with 362 additions and 48 deletions

View File

@ -5,7 +5,7 @@ import platform
import sys import sys
from errno import EINVAL, ENAMETOOLONG from errno import EINVAL, ENAMETOOLONG
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Tuple, Union from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
import yaml import yaml
@ -26,6 +26,13 @@ from wireviz.wv_utils import (
from . import APP_NAME from . import APP_NAME
class ParsedInput(NamedTuple):
"""Result of parsing the input argument to parse()."""
data: Dict # parsed YAML as a dict
source_path: Optional[Path] # file path if input was a file, else None
raw_yaml: Optional[str] # raw YAML string, None when input was a Dict
def parse( def parse(
inp: Union[Path, str, Dict], inp: Union[Path, str, Dict],
return_types: Union[None, str, Tuple[str]] = None, return_types: Union[None, str, Tuple[str]] = None,
@ -89,21 +96,28 @@ def parse(
if not output_formats and not return_types: if not output_formats and not return_types:
raise Exception("No output formats or return types specified") raise Exception("No output formats or return types specified")
yaml_data, yaml_file, yaml_str = _get_yaml_data_and_path(inp) # normalize output_formats to a tuple to prevent substring matching bugs
if not isinstance(yaml_data, dict): # ("png" in "pngsvg" == True, but "png" in ("pngsvg",) == False)
if isinstance(output_formats, str):
output_formats = (output_formats,)
parsed = _parse_input(inp)
if not isinstance(parsed.data, dict):
raise TypeError( raise TypeError(
f"Expected a dict as top-level YAML input, but got: {type(yaml_data)}" f"Expected a dict as top-level YAML input, but got: {type(parsed.data)}"
) )
yaml_data = parsed.data
if output_formats: if output_formats:
# need to write data to file, determine output directory and filename # need to write data to file, determine output directory and filename
output_dir = _get_output_dir(yaml_file, output_dir) output_dir = _get_output_dir(parsed.source_path, output_dir)
output_name = _get_output_name(yaml_file, output_name) output_name = _get_output_name(parsed.source_path, output_name)
output_file = output_dir / output_name output_file = output_dir / output_name
if yaml_file: if parsed.source_path:
# if reading from file, ensure that input file's parent directory # if reading from file, ensure that input file's parent directory
# is included in image_paths # is included in image_paths
default_image_path = yaml_file.parent.resolve() default_image_path = parsed.source_path.parent.resolve()
if not default_image_path in [Path(x).resolve() for x in image_paths]: if not default_image_path in [Path(x).resolve() for x in image_paths]:
image_paths.append(default_image_path) image_paths.append(default_image_path)
@ -396,8 +410,9 @@ def parse(
if output_formats: if output_formats:
harness.output(filename=output_file, fmt=output_formats, view=False) harness.output(filename=output_file, fmt=output_formats, view=False)
# embed YAML source into PNG for round-trip capability # embed YAML source into PNG for round-trip capability
if "png" in output_formats and yaml_str: # failure here is non-fatal — PNG diagram output remains valid
save_yaml_to_png(output_file, yaml_str) if "png" in output_formats and parsed.raw_yaml:
save_yaml_to_png(output_file, parsed.raw_yaml)
if return_types: if return_types:
returns = [] returns = []
@ -417,8 +432,13 @@ def parse(
return tuple(returns) if len(returns) != 1 else returns[0] return tuple(returns) if len(returns) != 1 else returns[0]
def _get_yaml_data_and_path(inp: Union[str, Path, Dict]) -> Tuple[Dict, Path, str]: def _parse_input(inp: Union[str, Path, Dict]) -> ParsedInput:
# determine whether inp is a file path, a YAML string, or a Dict """Parse the input argument into YAML data, source path, and raw string.
When inp is a Dict, source_path and raw_yaml will be None.
When inp is a YAML string, source_path will be None.
When inp is a file path, all three fields will be populated.
"""
if not isinstance(inp, Dict): # received a str or a Path if not isinstance(inp, Dict): # received a str or a Path
try: try:
yaml_path = Path(inp).expanduser().resolve(strict=True) yaml_path = Path(inp).expanduser().resolve(strict=True)
@ -446,7 +466,7 @@ def _get_yaml_data_and_path(inp: Union[str, Path, Dict]) -> Tuple[Dict, Path, st
yaml_data = inp yaml_data = inp
yaml_path = None yaml_path = None
yaml_str = None yaml_str = None
return yaml_data, yaml_path, yaml_str return ParsedInput(data=yaml_data, source_path=yaml_path, raw_yaml=yaml_str)
def _get_output_dir(input_file: Path, default_output_dir: Path) -> Path: def _get_output_dir(input_file: Path, default_output_dir: Path) -> Path:

View File

@ -1,63 +1,141 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Embed and extract YAML source data in PNG metadata (iTXT chunks).""" """Embed and extract YAML source data in PNG metadata (iTXT chunks).
PNGs support arbitrary text metadata via iTXT chunks. We use compressed
iTXT to store the original YAML harness definition, enabling round-trip
workflows: share a PNG extract the YAML regenerate or edit.
Security note: YAML extracted from untrusted PNGs must be parsed with
yaml.safe_load() and validated before use. A size limit is enforced
on extraction to guard against decompression bombs.
"""
import logging
import os
import tempfile
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple from typing import Optional, Tuple
from PIL import Image from PIL import Image
from PIL.PngImagePlugin import PngInfo from PIL.PngImagePlugin import PngInfo
logger = logging.getLogger(__name__)
PNG_KEY_YAML = "wireviz_yaml" PNG_KEY_YAML = "wireviz_yaml"
PNG_KEY_PREPEND = "wireviz_prepend_yaml" PNG_KEY_VERSION = "wireviz_meta_version"
META_VERSION = "1"
# 10 MB — no harness definition should approach this
MAX_YAML_SIZE = 10 * 1024 * 1024
def save_yaml_to_png( def _resolve_png_path(path: Path) -> Path:
png_path: Path, """Ensure path has .png suffix."""
yaml_input: str, path = Path(path)
prepend_input: str = "", if path.suffix != ".png":
) -> None: path = path.with_suffix(".png")
"""Save YAML source as compressed iTXT metadata in a PNG file.""" return path
png_path = Path(png_path)
if not png_path.suffix == ".png":
png_path = png_path.with_suffix(".png") def save_yaml_to_png(png_path: Path, yaml_input: str) -> bool:
"""Save YAML source as compressed iTXT metadata in a PNG file.
Preserves any existing text metadata in the PNG. Uses atomic
write-to-temp-then-rename to prevent corruption on interrupted writes.
Returns True if metadata was successfully embedded, False otherwise.
Failure is non-fatal the PNG diagram output remains valid.
"""
png_path = _resolve_png_path(png_path)
if not png_path.exists(): if not png_path.exists():
return logger.warning("Cannot embed YAML metadata: PNG not found: %s", png_path)
return False
with Image.open(fp=png_path) as im: try:
txt = PngInfo() with Image.open(fp=png_path) as im:
txt.add_itxt(PNG_KEY_YAML, yaml_input, zip=True) im.load() # force full read before we overwrite
if prepend_input:
txt.add_itxt(PNG_KEY_PREPEND, prepend_input, zip=True) # preserve existing text metadata
im.save(fp=png_path, pnginfo=txt) txt = PngInfo()
for key, value in im.text.items():
if key not in (PNG_KEY_YAML, PNG_KEY_VERSION):
txt.add_text(key, value)
txt.add_itxt(PNG_KEY_YAML, yaml_input, zip=True)
txt.add_text(PNG_KEY_VERSION, META_VERSION)
# atomic write: temp file → rename
fd, tmp_path = tempfile.mkstemp(
suffix=".png", dir=png_path.parent
)
try:
os.close(fd)
im.save(fp=tmp_path, pnginfo=txt)
Path(tmp_path).replace(png_path)
except Exception:
Path(tmp_path).unlink(missing_ok=True)
raise
return True
except Exception as e:
logger.warning("Failed to embed YAML metadata in %s: %s", png_path, e)
return False
def read_yaml_from_png(png_path: Path) -> Tuple[str, Optional[str]]: def read_yaml_from_png(png_path: Path) -> str:
"""Extract YAML source from a PNG file's iTXT metadata. """Extract YAML source from a PNG file's iTXT metadata.
Returns (yaml_input, prepend_input) where prepend_input may be None. Returns the embedded YAML string.
Raises:
FileNotFoundError: If the PNG file does not exist.
ValueError: If the PNG has no WireViz metadata, is corrupt,
or the embedded data exceeds the size limit.
""" """
png_path = Path(png_path) png_path = _resolve_png_path(png_path)
if not png_path.suffix == ".png": if not png_path.exists():
png_path = png_path.with_suffix(".png") raise FileNotFoundError(
f"Cannot read YAML metadata: PNG not found: {png_path}"
)
with Image.open(fp=png_path) as im: try:
im.load() with Image.open(fp=png_path) as im:
yaml_input = im.text.get(PNG_KEY_YAML, "") im.load()
prepend_input = im.text.get(PNG_KEY_PREPEND) yaml_input = im.text.get(PNG_KEY_YAML, "")
except Exception as e:
raise ValueError(
f"Cannot read YAML metadata from {png_path}: {e}"
) from e
return yaml_input, prepend_input if not yaml_input:
raise ValueError(
f"PNG does not contain WireViz YAML metadata: {png_path}"
)
if len(yaml_input) > MAX_YAML_SIZE:
raise ValueError(
f"Embedded YAML exceeds size limit "
f"({len(yaml_input)} > {MAX_YAML_SIZE} bytes): {png_path}"
)
return yaml_input
def has_yaml_metadata(png_path: Path) -> bool: def has_yaml_metadata(png_path: Path) -> bool:
"""Check if a PNG file contains embedded WireViz YAML data.""" """Check if a PNG file contains embedded WireViz YAML data.
png_path = Path(png_path)
if not png_path.suffix == ".png": Lightweight probe does not decompress or validate the YAML content.
png_path = png_path.with_suffix(".png") Returns False for missing or unreadable files.
"""
png_path = _resolve_png_path(png_path)
if not png_path.exists(): if not png_path.exists():
return False return False
with Image.open(fp=png_path) as im: try:
im.load() with Image.open(fp=png_path) as im:
return PNG_KEY_YAML in im.text im.load()
return PNG_KEY_YAML in im.text
except Exception:
return False

0
tests/__init__.py Normal file
View File

216
tests/test_png_metadata.py Normal file
View File

@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-
"""Tests for wv_png_metadata — YAML embedding in PNG iTXT chunks."""
from pathlib import Path
import pytest
from PIL import Image
from wireviz.wv_png_metadata import (
MAX_YAML_SIZE,
PNG_KEY_YAML,
PNG_KEY_VERSION,
has_yaml_metadata,
read_yaml_from_png,
save_yaml_to_png,
)
SAMPLE_YAML = """\
connectors:
X1:
pincount: 4
X2:
pincount: 4
cables:
W1:
wirecount: 4
length: 1
connections:
-
- X1: [1-4]
- W1: [1-4]
- X2: [1-4]
"""
def _create_test_png(path: Path, width: int = 10, height: int = 10) -> Path:
"""Create a minimal valid PNG file for testing."""
im = Image.new("RGB", (width, height), color="white")
im.save(path)
return path
# === Round-trip tests ===
class TestRoundTrip:
def test_basic_round_trip(self, tmp_path):
"""Write YAML to PNG, read it back, verify exact match."""
png = _create_test_png(tmp_path / "test.png")
assert save_yaml_to_png(png, SAMPLE_YAML) is True
result = read_yaml_from_png(png)
assert result == SAMPLE_YAML
def test_unicode_round_trip(self, tmp_path):
"""YAML with unicode characters survives embedding."""
yaml_unicode = "# Kabelbaum\nconnectors:\n X1:\n type: 'Stecker \u00f6\u00e4\u00fc\u00df'\n pincount: 2\n"
png = _create_test_png(tmp_path / "unicode.png")
assert save_yaml_to_png(png, yaml_unicode) is True
result = read_yaml_from_png(png)
assert result == yaml_unicode
assert "\u00f6\u00e4\u00fc\u00df" in result
def test_large_yaml_round_trip(self, tmp_path):
"""YAML at a realistic large size survives embedding."""
large_yaml = SAMPLE_YAML + (" # padding line\n" * 10000)
png = _create_test_png(tmp_path / "large.png")
assert save_yaml_to_png(png, large_yaml) is True
result = read_yaml_from_png(png)
assert result == large_yaml
def test_multiline_yaml_round_trip(self, tmp_path):
"""YAML with various whitespace patterns survives."""
yaml_ws = "connectors:\n X1:\n notes: |\n Line 1\n Line 2\n\n Line 4 after blank\n pincount: 2\n"
png = _create_test_png(tmp_path / "multiline.png")
assert save_yaml_to_png(png, yaml_ws) is True
assert read_yaml_from_png(png) == yaml_ws
# === save_yaml_to_png tests ===
class TestSave:
def test_returns_false_for_missing_file(self, tmp_path):
"""save_yaml_to_png on missing file returns False."""
missing = tmp_path / "nope.png"
assert save_yaml_to_png(missing, SAMPLE_YAML) is False
def test_auto_appends_png_suffix(self, tmp_path):
"""Passing path without .png suffix still works."""
_create_test_png(tmp_path / "test.png")
assert save_yaml_to_png(tmp_path / "test", SAMPLE_YAML) is True
assert has_yaml_metadata(tmp_path / "test.png")
def test_preserves_existing_metadata(self, tmp_path):
"""Existing PNG text metadata is not destroyed."""
from PIL.PngImagePlugin import PngInfo
png_path = tmp_path / "meta.png"
im = Image.new("RGB", (10, 10), color="white")
existing = PngInfo()
existing.add_text("Software", "GraphViz 14.1")
existing.add_text("Author", "WireViz")
im.save(png_path, pnginfo=existing)
save_yaml_to_png(png_path, SAMPLE_YAML)
with Image.open(png_path) as im:
im.load()
assert im.text.get("Software") == "GraphViz 14.1"
assert im.text.get("Author") == "WireViz"
assert PNG_KEY_YAML in im.text
def test_writes_version_tag(self, tmp_path):
"""Metadata includes a version tag for future compatibility."""
png = _create_test_png(tmp_path / "ver.png")
save_yaml_to_png(png, SAMPLE_YAML)
with Image.open(png) as im:
im.load()
assert im.text.get(PNG_KEY_VERSION) == "1"
def test_overwrites_previous_yaml(self, tmp_path):
"""Calling save twice replaces the embedded YAML."""
png = _create_test_png(tmp_path / "overwrite.png")
save_yaml_to_png(png, "first: version\n")
save_yaml_to_png(png, "second: version\n")
result = read_yaml_from_png(png)
assert result == "second: version\n"
def test_atomic_write_no_corruption_on_success(self, tmp_path):
"""File is valid PNG after successful save."""
png = _create_test_png(tmp_path / "atomic.png")
save_yaml_to_png(png, SAMPLE_YAML)
# verify it's still a valid, openable PNG
with Image.open(png) as im:
im.load()
assert im.size == (10, 10)
# === read_yaml_from_png tests ===
class TestRead:
def test_raises_file_not_found(self, tmp_path):
"""read_yaml_from_png on missing file raises FileNotFoundError."""
with pytest.raises(FileNotFoundError, match="PNG not found"):
read_yaml_from_png(tmp_path / "nope.png")
def test_raises_for_corrupt_file(self, tmp_path):
"""read_yaml_from_png on non-PNG file raises ValueError."""
corrupt = tmp_path / "corrupt.png"
corrupt.write_text("this is not a png")
with pytest.raises(ValueError, match="Cannot read YAML metadata"):
read_yaml_from_png(corrupt)
def test_raises_for_png_without_metadata(self, tmp_path):
"""read_yaml_from_png on plain PNG raises ValueError."""
png = _create_test_png(tmp_path / "plain.png")
with pytest.raises(ValueError, match="does not contain WireViz YAML"):
read_yaml_from_png(png)
def test_raises_for_oversized_yaml(self, tmp_path):
"""read_yaml_from_png rejects YAML exceeding the size limit.
Pillow's own MAX_TEXT_CHUNK guard fires at the decompression level
before our application-level size check defense in depth.
Both produce ValueError, just with different messages.
"""
png = _create_test_png(tmp_path / "bomb.png")
from PIL.PngImagePlugin import PngInfo
with Image.open(png) as im:
im.load()
txt = PngInfo()
txt.add_itxt(PNG_KEY_YAML, "x" * (MAX_YAML_SIZE + 1), zip=True)
im.save(png, pnginfo=txt)
# either our size check or Pillow's decompression limit will fire
with pytest.raises(ValueError):
read_yaml_from_png(png)
def test_auto_appends_png_suffix(self, tmp_path):
"""Passing path without .png suffix still works."""
png = _create_test_png(tmp_path / "test.png")
save_yaml_to_png(png, SAMPLE_YAML)
result = read_yaml_from_png(tmp_path / "test")
assert result == SAMPLE_YAML
# === has_yaml_metadata tests ===
class TestHasMetadata:
def test_true_after_embedding(self, tmp_path):
"""has_yaml_metadata returns True after embedding."""
png = _create_test_png(tmp_path / "test.png")
save_yaml_to_png(png, SAMPLE_YAML)
assert has_yaml_metadata(png) is True
def test_false_for_nonexistent_file(self, tmp_path):
"""has_yaml_metadata returns False for missing file."""
assert has_yaml_metadata(tmp_path / "nope.png") is False
def test_false_for_plain_png(self, tmp_path):
"""has_yaml_metadata returns False for PNG without metadata."""
png = _create_test_png(tmp_path / "plain.png")
assert has_yaml_metadata(png) is False
def test_false_for_corrupt_file(self, tmp_path):
"""has_yaml_metadata returns False for non-PNG file."""
corrupt = tmp_path / "corrupt.png"
corrupt.write_text("not a png")
assert has_yaml_metadata(corrupt) is False