diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index 3cf75d79..a9c03186 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "os/exec" + "time" "github.com/google/shlex" "github.com/rs/zerolog" @@ -38,6 +39,8 @@ import ( // Effectively this determines the maximum line length that can be handled. const StdoutBufferSize = 40 * 1024 +const timeFormat = time.RFC3339Nano + type BlenderParameters struct { exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager. argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name. @@ -66,7 +69,11 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. return err } + blenderPID := execCmd.Process.Pid + logger = logger.With().Int("pid", blenderPID).Logger() + reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) + logChunker := NewLogChunker(taskID, ce.listener) for { lineBytes, isPrefix, readErr := reader.ReadLine() @@ -86,11 +93,16 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. } logger.Debug().Msg(line) - // TODO: don't send the log line-by-line, but send in chunks. - if err := ce.listener.LogProduced(ctx, taskID, line); err != nil { - return err - } + + timestamp := time.Now().Format(timeFormat) + // %35s because trailing zeroes in the nanoseconds aren't output by the + // formatted timestamp, and thus it has a variable length. Using a fixed + // width in this Sprintf() call ensures the rest of the line aligns visually + // with the preceeding ones. + logLine := fmt.Sprintf("%35s: pid=%d > %s", timestamp, blenderPID, line) + logChunker.Append(ctx, logLine) } + logChunker.Flush(ctx) if err := execCmd.Wait(); err != nil { logger.Error().Err(err).Msg("error in CLI execution") diff --git a/internal/worker/log_chunker.go b/internal/worker/log_chunker.go new file mode 100644 index 00000000..70837776 --- /dev/null +++ b/internal/worker/log_chunker.go @@ -0,0 +1,70 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "strings" +) + +const defaultLogChunkerBufferFlushSize = 1024 + +// LogChunker gathers log lines in memory and sends them to a CommandListener. +// NOTE: LogChunker is not thread-safe. +type LogChunker struct { + taskID string + listener CommandListener + buffer strings.Builder + bufferFlushSize int // When the buffer grows beyond this many bytes, flush. +} + +func NewLogChunker(taskID string, listerer CommandListener) *LogChunker { + return &LogChunker{ + taskID: taskID, + listener: listerer, + buffer: strings.Builder{}, + bufferFlushSize: defaultLogChunkerBufferFlushSize, + } +} + +// Flush sends any buffered logs to the listener. +func (lc *LogChunker) Flush(ctx context.Context) error { + if lc.buffer.Len() == 0 { + return nil + } + + err := lc.listener.LogProduced(ctx, lc.taskID, lc.buffer.String()) + lc.buffer.Reset() + return err +} + +func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error { + for idx := range logLines { + lc.buffer.WriteString(logLines[idx]) + lc.buffer.WriteByte('\n') + } + + if lc.buffer.Len() > lc.bufferFlushSize { + return lc.Flush(ctx) + } + + return nil +} diff --git a/internal/worker/log_chunker_test.go b/internal/worker/log_chunker_test.go new file mode 100644 index 00000000..3023716f --- /dev/null +++ b/internal/worker/log_chunker_test.go @@ -0,0 +1,83 @@ +package worker + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks" +) + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +func TestLogChunkerEmpty(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockListener := mocks.NewMockCommandListener(mockCtrl) + lc := NewLogChunker("taskID", mockListener) + + // Note: no call to mockListener is expected. + err := lc.Flush(context.Background()) + assert.NoError(t, err) + + assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") +} + +func TestLogChunkerSimple(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockListener := mocks.NewMockCommandListener(mockCtrl) + lc := NewLogChunker("taskID", mockListener) + + ctx := context.Background() + mockListener.EXPECT().LogProduced(ctx, "taskID", "just one line\n") + + err := lc.Append(ctx, "just one line") + assert.NoError(t, err) + + err = lc.Flush(ctx) + assert.NoError(t, err) + assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") +} + +func TestLogChunkerMuchLogging(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockListener := mocks.NewMockCommandListener(mockCtrl) + lc := NewLogChunker("taskID", mockListener) + lc.bufferFlushSize = 12 + + ctx := context.Background() + + err := lc.Append(ctx, "één regel") // 9 runes, 11 bytes, 12 with newline, within buffer size. + assert.NoError(t, err) + + mockListener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n") + + err = lc.Append(ctx, "second line") // this pushes the buffer over its max size. + assert.NoError(t, err) + + assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") +}