Worker: better handling of task update errors
This commit is contained in:
parent
0ab8151a92
commit
3057a009e7
@ -36,7 +36,7 @@ var _ CommandListener = (*Listener)(nil)
|
|||||||
var _ TaskExecutionListener = (*Listener)(nil)
|
var _ TaskExecutionListener = (*Listener)(nil)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrTaskReassigned = errors.New("task was reassigned to other worker")
|
ErrTaskReassigned = errors.New("task was not assigned to this worker")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Listener listens to the result of task and command execution, and sends it to the Manager.
|
// Listener listens to the result of task and command execution, and sends it to the Manager.
|
||||||
@ -97,7 +97,7 @@ func (l *Listener) TaskStarted(ctx context.Context, taskID string) error {
|
|||||||
case http.StatusConflict:
|
case http.StatusConflict:
|
||||||
return ErrTaskReassigned
|
return ErrTaskReassigned
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown error from Manager: %v", resp.JSONDefault)
|
return fmt.Errorf("unknown error from Manager, code %d: %v", resp.StatusCode(), resp.JSONDefault)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,11 +61,16 @@ func NewTaskExecutor(cmdRunner CommandRunner, listener TaskExecutionListener) *T
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run runs a task.
|
||||||
|
// Returns ErrTaskReassigned when the task was reassigned to another worker.
|
||||||
func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
|
func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
|
||||||
logger := log.With().Str("task", task.Uuid).Logger()
|
logger := log.With().Str("task", task.Uuid).Logger()
|
||||||
logger.Info().Str("taskType", task.TaskType).Msg("starting task")
|
logger.Info().Str("taskType", task.TaskType).Msg("starting task")
|
||||||
|
|
||||||
if err := te.listener.TaskStarted(ctx, task.Uuid); err != nil {
|
if err := te.listener.TaskStarted(ctx, task.Uuid); err != nil {
|
||||||
|
if err == ErrTaskReassigned {
|
||||||
|
return ErrTaskReassigned
|
||||||
|
}
|
||||||
return fmt.Errorf("error sending 'task started' notification to manager: %w", err)
|
return fmt.Errorf("error sending 'task started' notification to manager: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,17 +84,26 @@ func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
err := te.cmdRunner.Run(ctx, task.Uuid, cmd)
|
runErr := te.cmdRunner.Run(ctx, task.Uuid, cmd)
|
||||||
|
if runErr == nil {
|
||||||
if err != nil {
|
// All was fine, go run the next command.
|
||||||
if err := te.listener.TaskFailed(ctx, task.Uuid, err.Error()); err != nil {
|
continue
|
||||||
return fmt.Errorf("error sending 'task failed' notification to manager: %w", err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify Manager that this task failed.
|
||||||
|
if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil {
|
||||||
|
if err == ErrTaskReassigned {
|
||||||
|
return ErrTaskReassigned
|
||||||
|
}
|
||||||
|
return fmt.Errorf("error sending 'task failed' notification to manager: %w", err)
|
||||||
|
}
|
||||||
|
return runErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := te.listener.TaskCompleted(ctx, task.Uuid); err != nil {
|
if err := te.listener.TaskCompleted(ctx, task.Uuid); err != nil {
|
||||||
|
if err == ErrTaskReassigned {
|
||||||
|
return ErrTaskReassigned
|
||||||
|
}
|
||||||
return fmt.Errorf("error sending 'task completed' notification to manager: %w", err)
|
return fmt.Errorf("error sending 'task completed' notification to manager: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user