Worker: parse stdout of Blender to recognise saved files

Prepare the Worker for submission of last-rendered images to Manager, by
parsing `stdout` of Blender to see which files were saved.

This needs more work, as now just an error "not implemented" is logged.
This commit is contained in:
Sybren A. Stüvel 2022-06-24 17:50:38 +02:00
parent 1f8c2df919
commit f244355328
3 changed files with 42 additions and 2 deletions

View File

@ -11,6 +11,7 @@ import (
"io" "io"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"regexp"
"github.com/google/shlex" "github.com/google/shlex"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -23,6 +24,8 @@ import (
// Effectively this determines the maximum line length that can be handled. // Effectively this determines the maximum line length that can be handled.
const StdoutBufferSize = 40 * 1024 const StdoutBufferSize = 40 * 1024
var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'")
type BlenderParameters struct { type BlenderParameters struct {
exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager. exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager.
argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name. argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name.
@ -75,6 +78,8 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
} }
logger.Debug().Msg(line) logger.Debug().Msg(line)
ce.processLineBlender(ctx, logger, taskID, line)
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil { if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err) return fmt.Errorf("appending log entry to log chunker: %w", err)
} }
@ -188,3 +193,22 @@ func cmdBlenderRenderParams(logger zerolog.Logger, cmd api.Command) (BlenderPara
return parameters, nil return parameters, nil
} }
func (ce *CommandExecutor) processLineBlender(ctx context.Context, logger zerolog.Logger, taskID string, line string) {
// TODO: check for "Warning: Unable to open" and other indicators of missing
// files. Flamenco v2 updated the task.Activity field for such situations.
match := regexpFileSaved.FindStringSubmatch(line)
if len(match) < 2 {
return
}
filename := match[1]
logger = logger.With().Str("outputFile", filename).Logger()
logger.Info().Msg("output produced")
err := ce.listener.OutputProduced(ctx, taskID, filename)
if err != nil {
logger.Warn().Err(err).Msg("error submitting produced output to listener")
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
@ -67,3 +68,19 @@ func TestCmdBlenderCliArgsInExeParameter(t *testing.T) {
err := ce.cmdBlenderRender(context.Background(), zerolog.Nop(), taskID, cmd) err := ce.cmdBlenderRender(context.Background(), zerolog.Nop(), taskID, cmd)
assert.Equal(t, ErrNoExecCmd, err, "nil *exec.Cmd should result in ErrNoExecCmd") assert.Equal(t, ErrNoExecCmd, err, "nil *exec.Cmd should result in ErrNoExecCmd")
} }
func TestProcessLineBlender(t *testing.T) {
ctx := context.Background()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
ce, mocks := testCommandExecutor(t, mockCtrl)
taskID := "c194ea21-1fda-46f6-bc9a-34bd302cfb19"
// This shouldn't call anything on the mocks.
ce.processLineBlender(ctx, log.Logger, taskID, "starting Blender")
// This should be recognised as produced output.
mocks.listener.EXPECT().OutputProduced(ctx, taskID, "/path/to/file.exr")
ce.processLineBlender(ctx, log.Logger, taskID, "Saved: '/path/to/file.exr'")
}

View File

@ -106,8 +106,7 @@ func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...s
// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error { func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error {
// TODO: implement return fmt.Errorf("Listener.OutputProduced(%q, %q): not implemented yet", taskID, outputLocation)
return nil
} }
func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {