
Change the package base name of the Go code, from `git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`. The old location, `git.blender.org`, has no longer been use since the [migration to Gitea][1]. The new package names now reflect the actual location where Flamenco is hosted. [1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
101 lines
2.9 KiB
Go
101 lines
2.9 KiB
Go
package worker
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
type CommandRunner interface {
|
|
Run(ctx context.Context, taskID string, cmd api.Command) error
|
|
}
|
|
|
|
// Generate mock implementation of this interface.
|
|
//go:generate go run github.com/golang/mock/mockgen -destination mocks/task_exe_listener.gen.go -package mocks projects.blender.org/studio/flamenco/internal/worker TaskExecutionListener
|
|
|
|
// TaskExecutionListener sends task lifecycle events (start/fail/complete) to the Manager.
|
|
type TaskExecutionListener interface {
|
|
// TaskStarted tells the Manager that task execution has started.
|
|
TaskStarted(ctx context.Context, taskID string) error
|
|
|
|
// TaskFailed tells the Manager the task failed for some reason.
|
|
TaskFailed(ctx context.Context, taskID string, reason string) error
|
|
|
|
// TaskCompleted tells the Manager the task has been completed.
|
|
TaskCompleted(ctx context.Context, taskID string) error
|
|
}
|
|
|
|
type TaskExecutor struct {
|
|
cmdRunner CommandRunner
|
|
listener TaskExecutionListener
|
|
}
|
|
|
|
var _ TaskRunner = (*TaskExecutor)(nil)
|
|
|
|
func NewTaskExecutor(cmdRunner CommandRunner, listener TaskExecutionListener) *TaskExecutor {
|
|
return &TaskExecutor{
|
|
cmdRunner: cmdRunner,
|
|
listener: listener,
|
|
}
|
|
}
|
|
|
|
// Run runs a task.
|
|
// Returns ErrTaskReassigned when the task was reassigned to another worker.
|
|
func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
|
|
logger := log.With().
|
|
Str("task", task.Uuid).
|
|
Str("job", task.Job).
|
|
Logger()
|
|
logger.Info().Str("taskType", task.TaskType).Msg("starting task")
|
|
|
|
if err := te.listener.TaskStarted(ctx, task.Uuid); err != nil {
|
|
if err == ErrTaskReassigned {
|
|
return ErrTaskReassigned
|
|
}
|
|
return fmt.Errorf("sending 'task started' notification to manager: %w", err)
|
|
}
|
|
|
|
for _, cmd := range task.Commands {
|
|
if ctx.Err() != nil {
|
|
// Shutdown does not mean task failure; cleanly shutting down will hand
|
|
// back the task for requeueing on the Manager.
|
|
logger.Warn().Msg("task execution aborted due to context shutdown")
|
|
return ctx.Err()
|
|
}
|
|
|
|
runErr := te.cmdRunner.Run(ctx, task.Uuid, cmd)
|
|
if runErr == nil {
|
|
// All was fine, go run the next command.
|
|
continue
|
|
}
|
|
if errors.Is(runErr, context.Canceled) {
|
|
logger.Warn().Msg("task execution aborted due to context shutdown")
|
|
return nil
|
|
}
|
|
|
|
// Notify Manager that this task failed.
|
|
if err := te.listener.TaskFailed(ctx, task.Uuid, runErr.Error()); err != nil {
|
|
if err == ErrTaskReassigned {
|
|
return ErrTaskReassigned
|
|
}
|
|
return fmt.Errorf("sending 'task failed' notification to manager: %w", err)
|
|
}
|
|
return runErr
|
|
}
|
|
|
|
if err := te.listener.TaskCompleted(ctx, task.Uuid); err != nil {
|
|
if err == ErrTaskReassigned {
|
|
return ErrTaskReassigned
|
|
}
|
|
return fmt.Errorf("sending 'task completed' notification to manager: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|