Worker: avoid task errors on shutdown

When shutting down, the main worker context closes. This causes the
subprocess to be killed, which in turn caused a task execution error. This
now no longer happens, as such errors are expected on shutdown and do not
indicate task failure.
This commit is contained in:
Sybren A. Stüvel 2022-04-08 14:32:25 +02:00
parent 96e5e84881
commit 62ea7dcf1d
3 changed files with 17 additions and 5 deletions

View File

@ -109,5 +109,12 @@ func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLoca
} }
func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
// Check whether the context is closed before doing anything.
select {
default:
case <-ctx.Done():
return ctx.Err()
}
return l.buffer.SendTaskUpdate(ctx, taskID, update) return l.buffer.SendTaskUpdate(ctx, taskID, update)
} }

View File

@ -4,6 +4,7 @@ package worker
import ( import (
"context" "context"
"errors"
"net/http" "net/http"
"time" "time"
@ -55,10 +56,9 @@ func (w *Worker) runStateAwake(ctx context.Context) {
// to the Manager. This code only needs to fetch a task and run it. // to the Manager. This code only needs to fetch a task and run it.
err := w.taskRunner.Run(ctx, *task) err := w.taskRunner.Run(ctx, *task)
if err != nil { if err != nil {
select { if errors.Is(err, context.Canceled) {
case <-ctx.Done(): log.Warn().Interface("task", *task).Msg("task aborted due to context being closed")
log.Warn().Err(err).Interface("task", *task).Msg("task aborted due to context being closed") } else {
default:
log.Warn().Err(err).Interface("task", *task).Msg("error executing task") log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
} }
} }
@ -72,7 +72,6 @@ func (w *Worker) runStateAwake(ctx context.Context) {
// Returns nil when a task could not be obtained and the period loop was cancelled. // Returns nil when a task could not be obtained and the period loop was cancelled.
func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask { func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
logger := log.With().Str("status", string(w.state)).Logger() logger := log.With().Str("status", string(w.state)).Logger()
logger.Info().Msg("fetching tasks")
// Initially don't wait at all. // Initially don't wait at all.
var wait time.Duration var wait time.Duration
@ -88,6 +87,7 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
case <-time.After(wait): case <-time.After(wait):
} }
logger.Info().Msg("fetching tasks")
resp, err := w.client.ScheduleTaskWithResponse(ctx) resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil { if err != nil {
log.Error().Err(err).Msg("error obtaining task") log.Error().Err(err).Msg("error obtaining task")

View File

@ -4,6 +4,7 @@ package worker
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -72,6 +73,10 @@ func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
// All was fine, go run the next command. // All was fine, go run the next command.
continue continue
} }
if errors.Is(runErr, context.Canceled) {
logger.Warn().Msg("task execution aborted due to context shutdown")
return nil
}
// Notify Manager that this task failed. // Notify Manager that this task failed.
if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil { if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil {