diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index 76fdbe7a..7eda3bd9 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -31,7 +31,7 @@ type BlenderParameters struct { // cmdBlender executes the "blender-render" command. func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.Logger, taskID string, cmd api.Command) error { cmdCtx, cmdCtxCancel := context.WithCancel(ctx) - defer cmdCtxCancel() + defer cmdCtxCancel() // Ensure the subprocess exits whenever this function returns. execCmd, err := ce.cmdBlenderRenderCommand(cmdCtx, logger, taskID, cmd) if err != nil { @@ -73,9 +73,13 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. } logger.Debug().Msg(line) - logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)) + if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil { + return fmt.Errorf("appending log entry to log chunker: %w", err) + } + } + if err := logChunker.Flush(ctx); err != nil { + return fmt.Errorf("flushing log chunker: %w", err) } - logChunker.Flush(ctx) if err := execCmd.Wait(); err != nil { logger.Error().Err(err).Msg("error in CLI execution") diff --git a/internal/worker/command_ffmpeg.go b/internal/worker/command_ffmpeg.go index 10e1cb14..1a6a1226 100644 --- a/internal/worker/command_ffmpeg.go +++ b/internal/worker/command_ffmpeg.go @@ -73,9 +73,13 @@ func (ce *CommandExecutor) cmdFramesToVideo(ctx context.Context, logger zerolog. } logger.Debug().Msg(line) - logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line)) + if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line)); err != nil { + return fmt.Errorf("appending log entry to log chunker: %w", err) + } + } + if err := logChunker.Flush(ctx); err != nil { + return fmt.Errorf("flushing log chunker: %w", err) } - logChunker.Flush(ctx) if err := execCmd.Wait(); err != nil { logger.Error().Err(err).Msg("error in CLI execution") diff --git a/internal/worker/command_file_mgmt_test.go b/internal/worker/command_file_mgmt_test.go index 1b9f33e7..c51da53c 100644 --- a/internal/worker/command_file_mgmt_test.go +++ b/internal/worker/command_file_mgmt_test.go @@ -156,7 +156,9 @@ func TestTimestampedPathFile(t *testing.T) { assert.NoError(t, err) fileCreateEmpty("somefile.txt") - os.Chtimes("somefile.txt", mtime, mtime) + if err := os.Chtimes("somefile.txt", mtime, mtime); err != nil { + t.Fatalf(err.Error()) + } newpath, err := timestampedPath("somefile.txt") @@ -174,8 +176,12 @@ func TestTimestampedPathDir(t *testing.T) { mtime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05-07:00") assert.NoError(t, err) - os.Mkdir("somedir", os.ModePerm) - os.Chtimes("somedir", mtime, mtime) + if err := os.Mkdir("somedir", os.ModePerm); err != nil { + t.Fatal(err.Error()) + } + if err := os.Chtimes("somedir", mtime, mtime); err != nil { + t.Fatal(err.Error()) + } newpath, err := timestampedPath("somedir") diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 753b422e..d0455411 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -139,7 +139,7 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error { if err != nil { return fmt.Errorf("beginning database transaction: %w", err) } - defer tx.Rollback() + defer rollbackTransaction(tx) stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)` log.Debug().Str("sql", stmt).Msg("creating database table") @@ -185,7 +185,7 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, if err != nil { return fmt.Errorf("beginning database transaction: %w", err) } - defer tx.Rollback() + defer rollbackTransaction(tx) blob, err := json.Marshal(update) if err != nil { @@ -241,7 +241,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err if err != nil { return false, fmt.Errorf("beginning database transaction: %w", err) } - defer tx.Rollback() + defer rollbackTransaction(tx) stmt := `SELECT rowid, task_id, payload FROM task_update_queue ORDER BY rowid LIMIT 1` log.Trace().Str("sql", stmt).Msg("fetching queued task updates") @@ -332,3 +332,9 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() { } } } + +func rollbackTransaction(tx *sql.Tx) { + if err := tx.Rollback(); err != nil { + log.Error().Err(err).Msg("rolling back transaction") + } +}