From c79fe55068f2498555762bfc8ffa648ea910e967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 28 Jul 2022 14:29:46 +0200 Subject: [PATCH] Worker: Refactor the running of subprocesses Blender and FFmpeg were run in the same way, using copy-pasted code. This is now abstracted away into the CLI runner, which in turn is moved into its own subpackage. No functional changes. --- cmd/flamenco-worker/main.go | 3 +- internal/worker/cli_runner.go | 20 ----- internal/worker/cli_runner/cli_runner.go | 103 +++++++++++++++++++++++ internal/worker/cli_runner/interfaces.go | 12 +++ internal/worker/command_blender.go | 79 ++++++----------- internal/worker/command_exe.go | 8 ++ internal/worker/command_ffmpeg.go | 58 ++----------- internal/worker/mocks/cli_runner.gen.go | 16 ++++ 8 files changed, 170 insertions(+), 129 deletions(-) delete mode 100644 internal/worker/cli_runner.go create mode 100644 internal/worker/cli_runner/cli_runner.go create mode 100644 internal/worker/cli_runner/interfaces.go diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index 58e8d3d8..ae19ca0b 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -23,6 +23,7 @@ import ( "git.blender.org/flamenco/internal/appinfo" "git.blender.org/flamenco/internal/find_ffmpeg" "git.blender.org/flamenco/internal/worker" + "git.blender.org/flamenco/internal/worker/cli_runner" ) var ( @@ -132,7 +133,7 @@ func main() { return } - cliRunner := worker.NewCLIRunner() + cliRunner := cli_runner.NewCLIRunner() listener = worker.NewListener(client, buffer) cmdRunner := worker.NewCommandExecutor(cliRunner, listener, timeService) taskRunner := worker.NewTaskExecutor(cmdRunner, listener) diff --git a/internal/worker/cli_runner.go b/internal/worker/cli_runner.go deleted file mode 100644 index d495dc24..00000000 --- a/internal/worker/cli_runner.go +++ /dev/null @@ -1,20 +0,0 @@ -package worker - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "context" - "os/exec" -) - -// CLIRunner is a wrapper around exec.CommandContext() to allow mocking. -type CLIRunner struct { -} - -func NewCLIRunner() *CLIRunner { - return &CLIRunner{} -} - -func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd { - return exec.CommandContext(ctx, name, arg...) -} diff --git a/internal/worker/cli_runner/cli_runner.go b/internal/worker/cli_runner/cli_runner.go new file mode 100644 index 00000000..245bad5e --- /dev/null +++ b/internal/worker/cli_runner/cli_runner.go @@ -0,0 +1,103 @@ +package cli_runner + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "bufio" + "context" + "fmt" + "io" + "os/exec" + + "github.com/rs/zerolog" +) + +// The buffer size used to read stdout/stderr output from subprocesses. +// Effectively this determines the maximum line length that can be handled. +const StdoutBufferSize = 40 * 1024 + +// CLIRunner is a wrapper around exec.CommandContext() to allow mocking. +type CLIRunner struct { +} + +func NewCLIRunner() *CLIRunner { + return &CLIRunner{} +} + +func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd { + return exec.CommandContext(ctx, name, arg...) +} + +// RunWithTextOutput runs a command and sends its output line-by-line to the +// lineChannel. Stdout and stderr are combined. +func (cli *CLIRunner) RunWithTextOutput( + ctx context.Context, + logger zerolog.Logger, + execCmd *exec.Cmd, + logChunker LogChunker, + lineChannel chan<- string, +) error { + outPipe, err := execCmd.StdoutPipe() + if err != nil { + return err + } + execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout. + + if err := execCmd.Start(); err != nil { + logger.Error().Err(err).Msg("error starting CLI execution") + return err + } + + blenderPID := execCmd.Process.Pid + logger = logger.With().Int("pid", blenderPID).Logger() + + reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) + + for { + lineBytes, isPrefix, readErr := reader.ReadLine() + if readErr == io.EOF { + break + } + if readErr != nil { + logger.Error().Err(err).Msg("error reading stdout/err") + return err + } + + line := string(lineBytes) + if isPrefix { + logger.Warn(). + Str("line", fmt.Sprintf("%s...", line[:256])). + Int("lineLength", len(line)). + Msg("unexpectedly long line read, truncating") + } + + logger.Debug().Msg(line) + if lineChannel != nil { + lineChannel <- line + } + + if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil { + return fmt.Errorf("appending log entry to log chunker: %w", err) + } + } + + if err := logChunker.Flush(ctx); err != nil { + return fmt.Errorf("flushing log chunker: %w", err) + } + + if err := execCmd.Wait(); err != nil { + logger.Error().Err(err).Msg("error in CLI execution") + return err + } + + if execCmd.ProcessState.Success() { + logger.Info().Msg("command exited succesfully") + } else { + logger.Error(). + Int("exitCode", execCmd.ProcessState.ExitCode()). + Msg("command exited abnormally") + return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode()) + } + + return nil +} diff --git a/internal/worker/cli_runner/interfaces.go b/internal/worker/cli_runner/interfaces.go new file mode 100644 index 00000000..92eeadd6 --- /dev/null +++ b/internal/worker/cli_runner/interfaces.go @@ -0,0 +1,12 @@ +package cli_runner + +// SPDX-License-Identifier: GPL-3.0-or-later + +import "context" + +type LogChunker interface { + // Flush sends any buffered logs to the listener. + Flush(ctx context.Context) error + // Append log lines to the buffer, sending to the listener when the buffer gets too large. + Append(ctx context.Context, logLines ...string) error +} diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index b91a203f..044d34f6 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -5,12 +5,11 @@ package worker /* This file contains the commands in the "blender" type group. */ import ( - "bufio" "context" "fmt" - "io" "os/exec" "regexp" + "sync" "github.com/google/shlex" "github.com/rs/zerolog" @@ -20,10 +19,6 @@ import ( "git.blender.org/flamenco/pkg/crosspath" ) -// The buffer size used to read stdout/stderr output from Blender. -// Effectively this determines the maximum line length that can be handled. -const StdoutBufferSize = 40 * 1024 - var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'") type BlenderParameters struct { @@ -43,65 +38,39 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. return err } - outPipe, err := execCmd.StdoutPipe() - if err != nil { - return err - } - execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout. - - if err := execCmd.Start(); err != nil { - logger.Error().Err(err).Msg("error starting CLI execution") - return err - } - - blenderPID := execCmd.Process.Pid - logger = logger.With().Int("pid", blenderPID).Logger() - - reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) logChunker := NewLogChunker(taskID, ce.listener, ce.timeService) + lineChannel := make(chan string) - for { - lineBytes, isPrefix, readErr := reader.ReadLine() - if readErr == io.EOF { - break - } - if readErr != nil { - logger.Error().Err(err).Msg("error reading stdout/err") - return err - } - line := string(lineBytes) - if isPrefix { - logger.Warn(). - Str("line", fmt.Sprintf("%s...", line[:256])). - Int("lineLength", len(line)). - Msg("unexpectedly long line read, truncating") + // Process the output of Blender. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for line := range lineChannel { + ce.processLineBlender(ctx, logger, taskID, line) } + }() - logger.Debug().Msg(line) - ce.processLineBlender(ctx, logger, taskID, line) + // Run the subprocess. + subprocessErr := ce.cli.RunWithTextOutput(ctx, + logger, + execCmd, + logChunker, + lineChannel, + ) - if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil { - return fmt.Errorf("appending log entry to log chunker: %w", err) - } - } - if err := logChunker.Flush(ctx); err != nil { - return fmt.Errorf("flushing log chunker: %w", err) - } + // Wait for the processing to stop. + close(lineChannel) + wg.Wait() - if err := execCmd.Wait(); err != nil { - logger.Error().Err(err).Msg("error in CLI execution") - return err - } - - if execCmd.ProcessState.Success() { - logger.Info().Msg("command exited succesfully") - } else { - logger.Error(). + if subprocessErr != nil { + logger.Error().Err(subprocessErr). Int("exitCode", execCmd.ProcessState.ExitCode()). Msg("command exited abnormally") - return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode()) + return subprocessErr } + logger.Info().Msg("command exited succesfully") return nil } diff --git a/internal/worker/command_exe.go b/internal/worker/command_exe.go index c9c03ea2..f9693939 100644 --- a/internal/worker/command_exe.go +++ b/internal/worker/command_exe.go @@ -12,6 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "git.blender.org/flamenco/internal/worker/cli_runner" "git.blender.org/flamenco/pkg/api" ) @@ -50,6 +51,13 @@ type TimeService interface { // CommandLineRunner is an interface around exec.CommandContext(). type CommandLineRunner interface { CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd + RunWithTextOutput( + ctx context.Context, + logger zerolog.Logger, + execCmd *exec.Cmd, + logChunker cli_runner.LogChunker, + lineChannel chan<- string, + ) error } // ErrNoExecCmd means CommandLineRunner.CommandContext() returned nil. diff --git a/internal/worker/command_ffmpeg.go b/internal/worker/command_ffmpeg.go index 3dd7c3aa..bc8f8632 100644 --- a/internal/worker/command_ffmpeg.go +++ b/internal/worker/command_ffmpeg.go @@ -5,11 +5,9 @@ package worker /* This file contains the commands in the "ffmpeg" type group. */ import ( - "bufio" "context" "errors" "fmt" - "io" "io/fs" "os" "os/exec" @@ -47,63 +45,17 @@ func (ce *CommandExecutor) cmdFramesToVideo(ctx context.Context, logger zerolog. } defer cleanup() - outPipe, err := execCmd.StdoutPipe() - if err != nil { - return err - } - execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout. - - if err := execCmd.Start(); err != nil { - logger.Error().Err(err).Msg("error starting CLI execution") - return err - } - - ffmpegPID := execCmd.Process.Pid - logger = logger.With().Int("pid", ffmpegPID).Logger() - - reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) logChunker := NewLogChunker(taskID, ce.listener, ce.timeService) + subprocessErr := ce.cli.RunWithTextOutput(ctx, logger, execCmd, logChunker, nil) - for { - lineBytes, isPrefix, readErr := reader.ReadLine() - if readErr == io.EOF { - break - } - if readErr != nil { - logger.Error().Err(err).Msg("error reading stdout/err") - return err - } - line := string(lineBytes) - if isPrefix { - logger.Warn(). - Str("line", fmt.Sprintf("%s...", line[:256])). - Int("lineLength", len(line)). - Msg("unexpectedly long line read, truncating") - } - - logger.Debug().Msg(line) - if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line)); err != nil { - return fmt.Errorf("appending log entry to log chunker: %w", err) - } - } - if err := logChunker.Flush(ctx); err != nil { - return fmt.Errorf("flushing log chunker: %w", err) - } - - if err := execCmd.Wait(); err != nil { - logger.Error().Err(err).Msg("error in CLI execution") - return err - } - - if execCmd.ProcessState.Success() { - logger.Info().Msg("command exited succesfully") - } else { - logger.Error(). + if subprocessErr != nil { + logger.Error().Err(subprocessErr). Int("exitCode", execCmd.ProcessState.ExitCode()). Msg("command exited abnormally") - return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode()) + return subprocessErr } + logger.Info().Msg("command exited succesfully") return nil } diff --git a/internal/worker/mocks/cli_runner.gen.go b/internal/worker/mocks/cli_runner.gen.go index 9df5a2d5..323806df 100644 --- a/internal/worker/mocks/cli_runner.gen.go +++ b/internal/worker/mocks/cli_runner.gen.go @@ -9,7 +9,9 @@ import ( exec "os/exec" reflect "reflect" + cli_runner "git.blender.org/flamenco/internal/worker/cli_runner" gomock "github.com/golang/mock/gomock" + zerolog "github.com/rs/zerolog" ) // MockCommandLineRunner is a mock of CommandLineRunner interface. @@ -53,3 +55,17 @@ func (mr *MockCommandLineRunnerMockRecorder) CommandContext(arg0, arg1 interface varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommandContext", reflect.TypeOf((*MockCommandLineRunner)(nil).CommandContext), varargs...) } + +// RunWithTextOutput mocks base method. +func (m *MockCommandLineRunner) RunWithTextOutput(arg0 context.Context, arg1 zerolog.Logger, arg2 *exec.Cmd, arg3 cli_runner.LogChunker, arg4 chan<- string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunWithTextOutput", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(error) + return ret0 +} + +// RunWithTextOutput indicates an expected call of RunWithTextOutput. +func (mr *MockCommandLineRunnerMockRecorder) RunWithTextOutput(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunWithTextOutput", reflect.TypeOf((*MockCommandLineRunner)(nil).RunWithTextOutput), arg0, arg1, arg2, arg3, arg4) +}