From f244355328cfa0d83e554bdb01a2dc585ed16eba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 24 Jun 2022 17:50:38 +0200 Subject: [PATCH] Worker: parse stdout of Blender to recognise saved files Prepare the Worker for submission of last-rendered images to Manager, by parsing `stdout` of Blender to see which files were saved. This needs more work, as now just an error "not implemented" is logged. --- internal/worker/command_blender.go | 24 ++++++++++++++++++++++++ internal/worker/command_blender_test.go | 17 +++++++++++++++++ internal/worker/listener.go | 3 +-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index 03ab52fd..5d31f98a 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -11,6 +11,7 @@ import ( "io" "os/exec" "path/filepath" + "regexp" "github.com/google/shlex" "github.com/rs/zerolog" @@ -23,6 +24,8 @@ import ( // Effectively this determines the maximum line length that can be handled. const StdoutBufferSize = 40 * 1024 +var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'") + type BlenderParameters struct { exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager. argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name. @@ -75,6 +78,8 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. } logger.Debug().Msg(line) + ce.processLineBlender(ctx, logger, taskID, line) + if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil { return fmt.Errorf("appending log entry to log chunker: %w", err) } @@ -188,3 +193,22 @@ func cmdBlenderRenderParams(logger zerolog.Logger, cmd api.Command) (BlenderPara return parameters, nil } + +func (ce *CommandExecutor) processLineBlender(ctx context.Context, logger zerolog.Logger, taskID string, line string) { + // TODO: check for "Warning: Unable to open" and other indicators of missing + // files. Flamenco v2 updated the task.Activity field for such situations. + + match := regexpFileSaved.FindStringSubmatch(line) + if len(match) < 2 { + return + } + filename := match[1] + + logger = logger.With().Str("outputFile", filename).Logger() + logger.Info().Msg("output produced") + + err := ce.listener.OutputProduced(ctx, taskID, filename) + if err != nil { + logger.Warn().Err(err).Msg("error submitting produced output to listener") + } +} diff --git a/internal/worker/command_blender_test.go b/internal/worker/command_blender_test.go index b654da2b..66e6dbbd 100644 --- a/internal/worker/command_blender_test.go +++ b/internal/worker/command_blender_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "git.blender.org/flamenco/pkg/api" @@ -67,3 +68,19 @@ func TestCmdBlenderCliArgsInExeParameter(t *testing.T) { err := ce.cmdBlenderRender(context.Background(), zerolog.Nop(), taskID, cmd) assert.Equal(t, ErrNoExecCmd, err, "nil *exec.Cmd should result in ErrNoExecCmd") } + +func TestProcessLineBlender(t *testing.T) { + ctx := context.Background() + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + ce, mocks := testCommandExecutor(t, mockCtrl) + taskID := "c194ea21-1fda-46f6-bc9a-34bd302cfb19" + + // This shouldn't call anything on the mocks. + ce.processLineBlender(ctx, log.Logger, taskID, "starting Blender") + + // This should be recognised as produced output. + mocks.listener.EXPECT().OutputProduced(ctx, taskID, "/path/to/file.exr") + ce.processLineBlender(ctx, log.Logger, taskID, "Saved: '/path/to/file.exr'") +} diff --git a/internal/worker/listener.go b/internal/worker/listener.go index bed90a39..51396cca 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -106,8 +106,7 @@ func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...s // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error { - // TODO: implement - return nil + return fmt.Errorf("Listener.OutputProduced(%q, %q): not implemented yet", taskID, outputLocation) } func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {