flamenco/internal/worker/state_awake.go
Sybren A. Stüvel 9f5e4cc0cc License: license all code under "GPL-3.0-or-later"
The add-on code was copy-pasted from other addons and used the GPL v2
license, whereas by accident the LICENSE text file had the GNU "Affero" GPL
license v3 (instead of regular GPL v3).

This is now all streamlined, and all code is licensed as "GPL v3 or later".

Furthermore, the code comments just show a SPDX License Identifier
instead of an entire license block.
2022-03-07 15:26:46 +01:00

123 lines
3.1 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"net/http"
"time"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/pkg/api"
)
const (
// How long to wait to fetch another task...
durationNoTask = 5 * time.Second // ... if there is no task now.
durationFetchFailed = 10 * time.Second // ... if fetching failed somehow.
)
func (w *Worker) gotoStateAwake(ctx context.Context) {
w.stateMutex.Lock()
w.state = api.WorkerStatusAwake
w.stateMutex.Unlock()
w.doneWg.Add(2)
w.ackStateChange(ctx, w.state)
go w.runStateAwake(ctx)
}
// runStateAwake fetches a task and executes it, in an endless loop.
func (w *Worker) runStateAwake(ctx context.Context) {
defer func() {
err := recover()
if err != nil {
w.SignOff(ctx)
log.Panic().
Interface("panic", err).
Str("workerStatus", string(w.state)).
Msg("panic, so signed off and going to stop")
}
}()
defer w.doneWg.Done()
defer log.Debug().Msg("stopping state 'awake'")
for {
task := w.fetchTask(ctx)
if task == nil {
return
}
// The task runner's listener will be responsible for sending results back
// to the Manager. This code only needs to fetch a task and run it.
err := w.taskRunner.Run(ctx, *task)
if err != nil {
log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
}
// Do some rate limiting. This is mostly useful while developing.
time.Sleep(2 * time.Second)
}
}
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
// Returns nil when a task could not be obtained and the period loop was cancelled.
func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
logger := log.With().Str("status", string(w.state)).Logger()
logger.Info().Msg("fetching tasks")
// Initially don't wait at all.
var wait time.Duration
for {
select {
case <-ctx.Done():
logger.Debug().Msg("task fetching interrupted by context cancellation")
return nil
case <-w.doneChan:
logger.Debug().Msg("task fetching interrupted by shutdown")
return nil
case <-time.After(wait):
}
resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
wait = durationFetchFailed
continue
}
switch {
case resp.JSON200 != nil:
log.Info().
Interface("task", resp.JSON200).
Msg("obtained task")
return resp.JSON200
case resp.JSON423 != nil:
log.Info().
Str("requestedStatus", string(resp.JSON423.StatusRequested)).
Msg("Manager requests status change")
w.changeState(ctx, resp.JSON423.StatusRequested)
return nil
case resp.JSON403 != nil:
log.Error().
Int("code", resp.StatusCode()).
Str("error", string(resp.JSON403.Message)).
Msg("access denied")
wait = durationFetchFailed
case resp.StatusCode() == http.StatusNoContent:
log.Info().Msg("no task available")
wait = durationNoTask
default:
log.Warn().
Int("code", resp.StatusCode()).
Str("error", string(resp.Body)).
Msg("unable to obtain task for unknown reason")
wait = durationFetchFailed
}
}
}