Worker: add various extra error checks

This commit is contained in:
Sybren A. Stüvel 2022-04-08 14:47:20 +02:00
parent 1960b668aa
commit e7fc2c6f6e
4 changed files with 31 additions and 11 deletions

View File

@ -31,7 +31,7 @@ type BlenderParameters struct {
// cmdBlender executes the "blender-render" command.
func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.Logger, taskID string, cmd api.Command) error {
cmdCtx, cmdCtxCancel := context.WithCancel(ctx)
defer cmdCtxCancel()
defer cmdCtxCancel() // Ensure the subprocess exits whenever this function returns.
execCmd, err := ce.cmdBlenderRenderCommand(cmdCtx, logger, taskID, cmd)
if err != nil {
@ -73,9 +73,13 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
}
logger.Debug().Msg(line)
logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line))
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err)
}
}
if err := logChunker.Flush(ctx); err != nil {
return fmt.Errorf("flushing log chunker: %w", err)
}
logChunker.Flush(ctx)
if err := execCmd.Wait(); err != nil {
logger.Error().Err(err).Msg("error in CLI execution")

View File

@ -73,9 +73,13 @@ func (ce *CommandExecutor) cmdFramesToVideo(ctx context.Context, logger zerolog.
}
logger.Debug().Msg(line)
logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line))
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err)
}
}
if err := logChunker.Flush(ctx); err != nil {
return fmt.Errorf("flushing log chunker: %w", err)
}
logChunker.Flush(ctx)
if err := execCmd.Wait(); err != nil {
logger.Error().Err(err).Msg("error in CLI execution")

View File

@ -156,7 +156,9 @@ func TestTimestampedPathFile(t *testing.T) {
assert.NoError(t, err)
fileCreateEmpty("somefile.txt")
os.Chtimes("somefile.txt", mtime, mtime)
if err := os.Chtimes("somefile.txt", mtime, mtime); err != nil {
t.Fatalf(err.Error())
}
newpath, err := timestampedPath("somefile.txt")
@ -174,8 +176,12 @@ func TestTimestampedPathDir(t *testing.T) {
mtime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05-07:00")
assert.NoError(t, err)
os.Mkdir("somedir", os.ModePerm)
os.Chtimes("somedir", mtime, mtime)
if err := os.Mkdir("somedir", os.ModePerm); err != nil {
t.Fatal(err.Error())
}
if err := os.Chtimes("somedir", mtime, mtime); err != nil {
t.Fatal(err.Error())
}
newpath, err := timestampedPath("somedir")

View File

@ -139,7 +139,7 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
defer rollbackTransaction(tx)
stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)`
log.Debug().Str("sql", stmt).Msg("creating database table")
@ -185,7 +185,7 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string,
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
defer rollbackTransaction(tx)
blob, err := json.Marshal(update)
if err != nil {
@ -241,7 +241,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
if err != nil {
return false, fmt.Errorf("beginning database transaction: %w", err)
}
defer tx.Rollback()
defer rollbackTransaction(tx)
stmt := `SELECT rowid, task_id, payload FROM task_update_queue ORDER BY rowid LIMIT 1`
log.Trace().Str("sql", stmt).Msg("fetching queued task updates")
@ -332,3 +332,9 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() {
}
}
}
func rollbackTransaction(tx *sql.Tx) {
if err := tx.Rollback(); err != nil {
log.Error().Err(err).Msg("rolling back transaction")
}
}