Worker: make sure long lines are broken on character boundaries
When a command (like `blender` or `ffmpeg`) outputs lines that are longer than our buffer, they are broken into buffer-sized chunks. Extra code has been added to ensure those chunks consist of valid UTF-8 characters.
This commit is contained in:
parent
fe8b6e025e
commit
ced826581a
@ -12,8 +12,9 @@ import (
|
|||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The buffer size used to read stdout/stderr output from subprocesses.
|
// The buffer size used to read stdout/stderr output from subprocesses, in
|
||||||
// Effectively this determines the maximum line length that can be handled.
|
// bytes. Effectively this determines the maximum line length that can be
|
||||||
|
// handled in one go. Lines that are longer will be broken up.
|
||||||
const StdoutBufferSize = 40 * 1024
|
const StdoutBufferSize = 40 * 1024
|
||||||
|
|
||||||
// CLIRunner is a wrapper around exec.CommandContext() to allow mocking.
|
// CLIRunner is a wrapper around exec.CommandContext() to allow mocking.
|
||||||
@ -60,6 +61,12 @@ func (cli *CLIRunner) RunWithTextOutput(
|
|||||||
// of simply returning, because the function must be run to completion in
|
// of simply returning, because the function must be run to completion in
|
||||||
// order to wait for processes (and not create defunct ones).
|
// order to wait for processes (and not create defunct ones).
|
||||||
var returnErr error = nil
|
var returnErr error = nil
|
||||||
|
|
||||||
|
// If a line longer than our buffer is received, it will be trimmed to the
|
||||||
|
// bufffer length. This means that it may not end on a valid character
|
||||||
|
// boundary. Any leftover bytes are collected here, and prepended to the next
|
||||||
|
// line.
|
||||||
|
leftovers := []byte{}
|
||||||
readloop:
|
readloop:
|
||||||
for {
|
for {
|
||||||
lineBytes, isPrefix, readErr := reader.ReadLine()
|
lineBytes, isPrefix, readErr := reader.ReadLine()
|
||||||
@ -73,12 +80,24 @@ readloop:
|
|||||||
break readloop
|
break readloop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepend any leftovers from the previous line to the received bytes.
|
||||||
|
if len(leftovers) > 0 {
|
||||||
|
lineBytes = append(leftovers, lineBytes...)
|
||||||
|
leftovers = []byte{}
|
||||||
|
}
|
||||||
|
// Make sure long lines are broken on character boundaries.
|
||||||
|
lineBytes, leftovers = splitOnCharacterBoundary(lineBytes)
|
||||||
|
|
||||||
line := string(lineBytes)
|
line := string(lineBytes)
|
||||||
if isPrefix {
|
if isPrefix {
|
||||||
|
prefix := []rune(line)
|
||||||
|
if len(prefix) > 256 {
|
||||||
|
prefix = prefix[:256]
|
||||||
|
}
|
||||||
logger.Warn().
|
logger.Warn().
|
||||||
Str("line", fmt.Sprintf("%s...", line[:256])).
|
Str("line", fmt.Sprintf("%s...", string(prefix))).
|
||||||
Int("lineLength", len(line)).
|
Int("bytesRead", len(lineBytes)).
|
||||||
Msg("unexpectedly long line read, truncating")
|
Msg("unexpectedly long line read, will be split up")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug().Msg(line)
|
logger.Debug().Msg(line)
|
||||||
|
31
internal/worker/cli_runner/strings.go
Normal file
31
internal/worker/cli_runner/strings.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package cli_runner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
// splitOnCharacterBoundary splits `b` such that `valid` + `tail` = `b` and
|
||||||
|
// `valid` is valid UTF-8.
|
||||||
|
func splitOnCharacterBoundary(b []byte) (valid []byte, tail []byte) {
|
||||||
|
totalLength := len(b)
|
||||||
|
tailBytes := 0
|
||||||
|
for {
|
||||||
|
valid = b[:totalLength-tailBytes]
|
||||||
|
r, size := utf8.DecodeLastRune(valid)
|
||||||
|
switch {
|
||||||
|
case r == utf8.RuneError && size == 0:
|
||||||
|
// valid is empty, which means 'b' consists of only non-UTF8 bytes.
|
||||||
|
return valid, b
|
||||||
|
case r == utf8.RuneError && size == 1:
|
||||||
|
// The last bytes do not form a valid rune. See what happens if we move
|
||||||
|
// one byte from `valid` to `tail`.
|
||||||
|
tailBytes++
|
||||||
|
continue
|
||||||
|
case r == utf8.RuneError:
|
||||||
|
// This shouldn't happen, RuneError should only be returned with size 0 or 1.
|
||||||
|
panic(size)
|
||||||
|
default:
|
||||||
|
return valid, b[totalLength-tailBytes:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
33
internal/worker/cli_runner/strings_test.go
Normal file
33
internal/worker/cli_runner/strings_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package cli_runner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSplitOnCharacterBoundary(t *testing.T) {
|
||||||
|
// Test with strings, as those are easier to type.
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
wantValid string
|
||||||
|
wantTail string
|
||||||
|
}{
|
||||||
|
{"empty", "", "", ""},
|
||||||
|
{"trivial", "abc", "abc", ""},
|
||||||
|
{"valid", "Stüvel", "Stüvel", ""},
|
||||||
|
{"cats", "🐈🐈\xf0\x9f\x90\x88", "🐈🐈🐈", ""},
|
||||||
|
{"truncated-cats-1", "🐈🐈\xf0\x9f\x90", "🐈🐈", "\xf0\x9f\x90"},
|
||||||
|
{"truncated-cats-2", "🐈🐈\xf0\x9f", "🐈🐈", "\xf0\x9f"},
|
||||||
|
{"truncated-cats-3", "🐈🐈\xf0", "🐈🐈", "\xf0"},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
gotValid, gotTail := splitOnCharacterBoundary([]byte(tt.input))
|
||||||
|
assert.Equal(t, tt.input, string(gotValid)+string(gotTail))
|
||||||
|
assert.Equal(t, tt.wantValid, string(gotValid))
|
||||||
|
assert.Equal(t, tt.wantTail, string(gotTail))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user