From 3057a009e7d86fd1c591ea065bdc7611dbce6294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 17 Feb 2022 14:02:45 +0100 Subject: [PATCH] Worker: better handling of task update errors --- internal/worker/listener.go | 4 ++-- internal/worker/task_executor.go | 28 +++++++++++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/internal/worker/listener.go b/internal/worker/listener.go index 700e6087..5f030508 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -36,7 +36,7 @@ var _ CommandListener = (*Listener)(nil) var _ TaskExecutionListener = (*Listener)(nil) var ( - ErrTaskReassigned = errors.New("task was reassigned to other worker") + ErrTaskReassigned = errors.New("task was not assigned to this worker") ) // Listener listens to the result of task and command execution, and sends it to the Manager. @@ -97,7 +97,7 @@ func (l *Listener) TaskStarted(ctx context.Context, taskID string) error { case http.StatusConflict: return ErrTaskReassigned default: - return fmt.Errorf("unknown error from Manager: %v", resp.JSONDefault) + return fmt.Errorf("unknown error from Manager, code %d: %v", resp.StatusCode(), resp.JSONDefault) } } diff --git a/internal/worker/task_executor.go b/internal/worker/task_executor.go index ce0207e8..546cb64b 100644 --- a/internal/worker/task_executor.go +++ b/internal/worker/task_executor.go @@ -61,11 +61,16 @@ func NewTaskExecutor(cmdRunner CommandRunner, listener TaskExecutionListener) *T } } +// Run runs a task. +// Returns ErrTaskReassigned when the task was reassigned to another worker. func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error { logger := log.With().Str("task", task.Uuid).Logger() logger.Info().Str("taskType", task.TaskType).Msg("starting task") if err := te.listener.TaskStarted(ctx, task.Uuid); err != nil { + if err == ErrTaskReassigned { + return ErrTaskReassigned + } return fmt.Errorf("error sending 'task started' notification to manager: %w", err) } @@ -79,17 +84,26 @@ func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error { default: } - err := te.cmdRunner.Run(ctx, task.Uuid, cmd) - - if err != nil { - if err := te.listener.TaskFailed(ctx, task.Uuid, err.Error()); err != nil { - return fmt.Errorf("error sending 'task failed' notification to manager: %w", err) - } - return err + runErr := te.cmdRunner.Run(ctx, task.Uuid, cmd) + if runErr == nil { + // All was fine, go run the next command. + continue } + + // Notify Manager that this task failed. + if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil { + if err == ErrTaskReassigned { + return ErrTaskReassigned + } + return fmt.Errorf("error sending 'task failed' notification to manager: %w", err) + } + return runErr } if err := te.listener.TaskCompleted(ctx, task.Uuid); err != nil { + if err == ErrTaskReassigned { + return ErrTaskReassigned + } return fmt.Errorf("error sending 'task completed' notification to manager: %w", err) }