diff --git a/internal/worker/listener.go b/internal/worker/listener.go index f2ecc063..ec1c88b9 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -109,5 +109,12 @@ func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLoca } func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { + // Check whether the context is closed before doing anything. + select { + default: + case <-ctx.Done(): + return ctx.Err() + } + return l.buffer.SendTaskUpdate(ctx, taskID, update) } diff --git a/internal/worker/state_awake.go b/internal/worker/state_awake.go index 90f71e7e..ca1b3301 100644 --- a/internal/worker/state_awake.go +++ b/internal/worker/state_awake.go @@ -4,6 +4,7 @@ package worker import ( "context" + "errors" "net/http" "time" @@ -55,10 +56,9 @@ func (w *Worker) runStateAwake(ctx context.Context) { // to the Manager. This code only needs to fetch a task and run it. err := w.taskRunner.Run(ctx, *task) if err != nil { - select { - case <-ctx.Done(): - log.Warn().Err(err).Interface("task", *task).Msg("task aborted due to context being closed") - default: + if errors.Is(err, context.Canceled) { + log.Warn().Interface("task", *task).Msg("task aborted due to context being closed") + } else { log.Warn().Err(err).Interface("task", *task).Msg("error executing task") } } @@ -72,7 +72,6 @@ func (w *Worker) runStateAwake(ctx context.Context) { // Returns nil when a task could not be obtained and the period loop was cancelled. func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask { logger := log.With().Str("status", string(w.state)).Logger() - logger.Info().Msg("fetching tasks") // Initially don't wait at all. var wait time.Duration @@ -88,6 +87,7 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask { case <-time.After(wait): } + logger.Info().Msg("fetching tasks") resp, err := w.client.ScheduleTaskWithResponse(ctx) if err != nil { log.Error().Err(err).Msg("error obtaining task") diff --git a/internal/worker/task_executor.go b/internal/worker/task_executor.go index 178f9c29..c56c33f8 100644 --- a/internal/worker/task_executor.go +++ b/internal/worker/task_executor.go @@ -4,6 +4,7 @@ package worker import ( "context" + "errors" "fmt" "github.com/rs/zerolog/log" @@ -72,6 +73,10 @@ func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error { // All was fine, go run the next command. continue } + if errors.Is(runErr, context.Canceled) { + logger.Warn().Msg("task execution aborted due to context shutdown") + return nil + } // Notify Manager that this task failed. if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil {