flamenco/internal/worker/state_awake.go
Sybren A. Stüvel 9dbc952c09 Worker: move wait time into variable
No functional changes.
2022-05-12 12:44:50 +02:00

131 lines
3.4 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"net/http"
"time"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/pkg/api"
)
const (
// How long to wait to fetch another task...
durationNoTask = 2 * time.Second // ... if there is no task now.
durationFetchFailed = 10 * time.Second // ... if fetching failed somehow.
durationTaskComplete = 2 * time.Second // ... when a task was completed.
)
func (w *Worker) gotoStateAwake(ctx context.Context) {
w.stateMutex.Lock()
w.state = api.WorkerStatusAwake
w.stateMutex.Unlock()
w.doneWg.Add(2)
w.ackStateChange(ctx, w.state)
go w.runStateAwake(ctx)
}
// runStateAwake fetches a task and executes it, in an endless loop.
func (w *Worker) runStateAwake(ctx context.Context) {
defer func() {
err := recover()
if err != nil {
w.SignOff(ctx)
logger := w.loggerWithStatus()
logger.Panic().
Interface("panic", err).
Msg("panic, so signed off and going to stop")
}
}()
defer w.doneWg.Done()
defer log.Debug().Msg("stopping state 'awake'")
for {
task := w.fetchTask(ctx)
if task == nil {
return
}
// The task runner's listener will be responsible for sending results back
// to the Manager. This code only needs to fetch a task and run it.
err := w.taskRunner.Run(ctx, *task)
if err != nil {
if errors.Is(err, context.Canceled) {
log.Warn().Interface("task", *task).Msg("task aborted due to context being closed")
} else {
log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
}
}
// Do some rate limiting. This is mostly useful while developing.
time.Sleep(durationTaskComplete)
}
}
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
// Returns nil when a task could not be obtained and the period loop was cancelled.
func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
logger := w.loggerWithStatus()
// Initially don't wait at all.
var wait time.Duration
for {
select {
case <-ctx.Done():
logger.Debug().Msg("task fetching interrupted by context cancellation")
return nil
case <-w.doneChan:
logger.Debug().Msg("task fetching interrupted by shutdown")
return nil
case <-time.After(wait):
}
logger.Debug().Msg("fetching tasks")
resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
wait = durationFetchFailed
continue
}
switch {
case resp.JSON200 != nil:
log.Info().
Interface("task", resp.JSON200).
Msg("obtained task")
return resp.JSON200
case resp.JSON423 != nil:
log.Info().
Str("requestedStatus", string(resp.JSON423.StatusRequested)).
Msg("Manager requests status change")
w.changeState(ctx, resp.JSON423.StatusRequested)
return nil
case resp.JSON403 != nil:
log.Error().
Int("code", resp.StatusCode()).
Str("error", string(resp.JSON403.Message)).
Msg("access denied")
wait = durationFetchFailed
case resp.StatusCode() == http.StatusNoContent:
log.Debug().Msg("no task available")
// TODO: implement gradual back-off, to avoid too frequent checks when the
// farm is idle.
wait = durationNoTask
default:
log.Warn().
Int("code", resp.StatusCode()).
Str("error", string(resp.Body)).
Msg("unable to obtain task for unknown reason")
wait = durationFetchFailed
}
}
}