diff --git a/internal/worker/cli_runner.go b/internal/worker/cli_runner.go index d90848bb..77945a69 100644 --- a/internal/worker/cli_runner.go +++ b/internal/worker/cli_runner.go @@ -34,5 +34,5 @@ func NewCLIRunner() *CLIRunner { } func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd { - return nil + return exec.CommandContext(ctx, name, arg...) } diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index 3ea89839..3cf75d79 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -23,14 +23,21 @@ package worker /* This file contains the commands in the "blender" type group. */ import ( + "bufio" "context" "fmt" + "io" + "os/exec" "github.com/google/shlex" "github.com/rs/zerolog" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) +// The buffer size used to read stdout/stderr output from Blender. +// Effectively this determines the maximum line length that can be handled. +const StdoutBufferSize = 40 * 1024 + type BlenderParameters struct { exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager. argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name. @@ -40,11 +47,79 @@ type BlenderParameters struct { // cmdBlender executes the "blender-render" command. func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.Logger, taskID string, cmd api.Command) error { - parameters, err := cmdBlenderRenderParams(logger, cmd) + cmdCtx, cmdCtxCancel := context.WithCancel(ctx) + defer cmdCtxCancel() + + execCmd, err := ce.cmdBlenderRenderCommand(cmdCtx, logger, taskID, cmd) if err != nil { return err } + execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout. + outPipe, err := execCmd.StdoutPipe() + if err != nil { + return err + } + + if err := execCmd.Start(); err != nil { + logger.Error().Err(err).Msg("error starting CLI execution") + return err + } + + reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) + + for { + lineBytes, isPrefix, readErr := reader.ReadLine() + if readErr == io.EOF { + break + } + if readErr != nil { + logger.Error().Err(err).Msg("error reading stdout/err") + return err + } + line := string(lineBytes) + if isPrefix { + logger.Warn(). + Str("line", fmt.Sprintf("%s...", line[:256])). + Int("lineLength", len(line)). + Msg("unexpectedly long line read, truncating") + } + + logger.Debug().Msg(line) + // TODO: don't send the log line-by-line, but send in chunks. + if err := ce.listener.LogProduced(ctx, taskID, line); err != nil { + return err + } + } + + if err := execCmd.Wait(); err != nil { + logger.Error().Err(err).Msg("error in CLI execution") + return err + } + + if execCmd.ProcessState.Success() { + logger.Info().Msg("command exited succesfully") + } else { + logger.Error(). + Int("exitCode", execCmd.ProcessState.ExitCode()). + Msg("command exited abnormally") + return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode()) + } + + return nil +} + +func (ce *CommandExecutor) cmdBlenderRenderCommand( + ctx context.Context, + logger zerolog.Logger, + taskID string, + cmd api.Command, +) (*exec.Cmd, error) { + parameters, err := cmdBlenderRenderParams(logger, cmd) + if err != nil { + return nil, err + } + cliArgs := make([]string, 0) cliArgs = append(cliArgs, parameters.argsBefore...) cliArgs = append(cliArgs, parameters.blendfile) @@ -54,17 +129,18 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. logger.Error(). Str("cmdName", cmd.Name). Msg("unable to create command executor") - return ErrNoExecCmd + return nil, ErrNoExecCmd } logger.Info(). Str("cmdName", cmd.Name). Str("execCmd", execCmd.String()). Msg("going to execute Blender") - if err := ce.listener.LogProduced(ctx, taskID, fmt.Sprintf("going to run: %s", cliArgs)); err != nil { - return err + if err := ce.listener.LogProduced(ctx, taskID, fmt.Sprintf("going to run: %s %q", parameters.exe, cliArgs)); err != nil { + return nil, err } - return nil + + return execCmd, nil } func cmdBlenderRenderParams(logger zerolog.Logger, cmd api.Command) (BlenderParameters, error) { diff --git a/internal/worker/command_blender_test.go b/internal/worker/command_blender_test.go index 02397d56..ef9820d1 100644 --- a/internal/worker/command_blender_test.go +++ b/internal/worker/command_blender_test.go @@ -36,7 +36,6 @@ func TestCmdBlenderSimpleCliArgs(t *testing.T) { ce, mocks := testCommandExecutor(t, mockCtrl) - ctx := context.Background() taskID := "1d54c6fe-1242-4c8f-bd63-5a09e358d7b6" cmd := api.Command{ Name: "blender", @@ -49,9 +48,9 @@ func TestCmdBlenderSimpleCliArgs(t *testing.T) { } cliArgs := []string{"--background", "file.blend", "--render-output", "/frames"} - mocks.cli.EXPECT().CommandContext(ctx, "/path/to/blender", cliArgs).Return(nil) + mocks.cli.EXPECT().CommandContext(gomock.Any(), "/path/to/blender", cliArgs).Return(nil) - err := ce.cmdBlenderRender(ctx, zerolog.Nop(), taskID, cmd) + err := ce.cmdBlenderRender(context.Background(), zerolog.Nop(), taskID, cmd) assert.Equal(t, ErrNoExecCmd, err, "nil *exec.Cmd should result in ErrNoExecCmd") } @@ -61,7 +60,6 @@ func TestCmdBlenderCliArgsInExeParameter(t *testing.T) { ce, mocks := testCommandExecutor(t, mockCtrl) - ctx := context.Background() taskID := "1d54c6fe-1242-4c8f-bd63-5a09e358d7b6" cmd := api.Command{ Name: "blender", @@ -73,7 +71,7 @@ func TestCmdBlenderCliArgsInExeParameter(t *testing.T) { }, } - mocks.cli.EXPECT().CommandContext(ctx, + mocks.cli.EXPECT().CommandContext(gomock.Any(), "/path/to/blender", // from 'exe' "--factory-startup", // from 'exe' "--python-expr", // from 'exe' @@ -83,6 +81,6 @@ func TestCmdBlenderCliArgsInExeParameter(t *testing.T) { "--debug", // from 'args' ).Return(nil) - err := ce.cmdBlenderRender(ctx, zerolog.Nop(), taskID, cmd) + err := ce.cmdBlenderRender(context.Background(), zerolog.Nop(), taskID, cmd) assert.Equal(t, ErrNoExecCmd, err, "nil *exec.Cmd should result in ErrNoExecCmd") }