Manager: move task requeueing to TaskStateMachine
Requeueing the tasks of a specific worker is now done in the `TaskStateMachine`, such that it can be called from other services as well in future commits. This also makes the `LogStorage` service a dependency of the `TaskStateMachine`, as it needs to write "this task was requeued" kind of messages to the task logs.
This commit is contained in:
parent
e06bc484f4
commit
c3525c3b1a
@ -109,8 +109,8 @@ func main() {
|
||||
|
||||
timeService := clock.New()
|
||||
webUpdater := webupdates.New()
|
||||
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater)
|
||||
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater)
|
||||
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
|
||||
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater)
|
||||
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls)
|
||||
|
||||
|
@ -32,7 +32,6 @@ type PersistenceService interface {
|
||||
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
|
||||
SaveTask(ctx context.Context, task *persistence.Task) error
|
||||
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
||||
FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error)
|
||||
// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection.
|
||||
TaskTouchedByWorker(context.Context, *persistence.Task) error
|
||||
|
||||
@ -59,6 +58,8 @@ type TaskStateMachine interface {
|
||||
|
||||
// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
|
||||
JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error
|
||||
|
||||
RequeueTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error
|
||||
}
|
||||
|
||||
// TaskStateMachine should be a subset of task_state_machine.StateMachine.
|
||||
|
@ -84,21 +84,6 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTask(arg0, arg1 interface{})
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTask", reflect.TypeOf((*MockPersistenceService)(nil).FetchTask), arg0, arg1)
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatus mocks base method.
|
||||
func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].([]*persistence.Task)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatus indicates an expected call of FetchTasksOfWorkerInStatus.
|
||||
func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// FetchWorker mocks base method.
|
||||
func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) {
|
||||
m.ctrl.T.Helper()
|
||||
@ -550,6 +535,20 @@ func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, ar
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// RequeueTasksOfWorker mocks base method.
|
||||
func (m *MockTaskStateMachine) RequeueTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "RequeueTasksOfWorker", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RequeueTasksOfWorker indicates an expected call of RequeueTasksOfWorker.
|
||||
func (mr *MockTaskStateMachineMockRecorder) RequeueTasksOfWorker(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueTasksOfWorker), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// TaskStatusChange mocks base method.
|
||||
func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -168,7 +168,7 @@ func (f *Flamenco) SignOff(e echo.Context) error {
|
||||
}
|
||||
|
||||
// Re-queue all tasks (should be only one) this worker is now working on.
|
||||
err = f.workerRequeueActiveTasks(ctx, logger, w)
|
||||
err = f.stateMachine.RequeueTasksOfWorker(ctx, w, "worker signed off")
|
||||
if err != nil {
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks")
|
||||
}
|
||||
@ -180,44 +180,6 @@ func (f *Flamenco) SignOff(e echo.Context) error {
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// workerRequeueActiveTasks re-queues all active tasks (should be max one) of this worker.
|
||||
func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog.Logger, worker *persistence.Worker) error {
|
||||
// Fetch the tasks to update.
|
||||
tasks, err := f.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err)
|
||||
}
|
||||
|
||||
// Run each task change through the task state machine.
|
||||
var lastErr error
|
||||
for _, task := range tasks {
|
||||
logger.Info().
|
||||
Str("task", task.UUID).
|
||||
Msg("re-queueing task")
|
||||
|
||||
// Write to task activity that it got requeued because of worker sign-off.
|
||||
task.Activity = "Task requeued because worked signed off"
|
||||
if err := f.persist.SaveTaskActivity(ctx, task); err != nil {
|
||||
logger.Warn().Err(err).
|
||||
Str("task", task.UUID).
|
||||
Msg("error queueing task on worker sign-off")
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil {
|
||||
logger.Warn().Err(err).
|
||||
Str("task", task.UUID).
|
||||
Msg("error queueing task on worker sign-off")
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
_ = f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID,
|
||||
"Task was requeued by Manager because the worker assigned to it signed off.")
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// (GET /api/worker/state)
|
||||
func (f *Flamenco) WorkerState(e echo.Context) error {
|
||||
worker := requestWorkerOrPanic(e)
|
||||
|
@ -134,23 +134,6 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
|
||||
mf := newMockedFlamenco(mockCtrl)
|
||||
worker := testWorker()
|
||||
|
||||
job := persistence.Job{
|
||||
UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0",
|
||||
}
|
||||
// Mock that the worker has two active tasks. It shouldn't happen, but even
|
||||
// when it does, both should be requeued when the worker signs off.
|
||||
task1 := persistence.Task{
|
||||
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
|
||||
Job: &job,
|
||||
Status: api.TaskStatusActive,
|
||||
}
|
||||
task2 := persistence.Task{
|
||||
UUID: "beb3f39b-57a5-44bf-a0ad-533e3513a0b6",
|
||||
Job: &job,
|
||||
Status: api.TaskStatusActive,
|
||||
}
|
||||
workerTasks := []*persistence.Task{&task1, &task2}
|
||||
|
||||
// Signing off should be handled completely, even when the HTTP connection
|
||||
// breaks. This means using a different context than the one passed by Echo.
|
||||
echo := mf.prepareMockedRequest(nil)
|
||||
@ -158,18 +141,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
|
||||
expectCtx := gomock.Not(gomock.Eq(echo.Request().Context()))
|
||||
|
||||
// Expect worker's tasks to be re-queued.
|
||||
mf.persistence.EXPECT().
|
||||
FetchTasksOfWorkerInStatus(expectCtx, &worker, api.TaskStatusActive).
|
||||
Return(workerTasks, nil)
|
||||
mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task1, api.TaskStatusQueued)
|
||||
mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task2, api.TaskStatusQueued)
|
||||
|
||||
// Expect this re-queueing to end up in the task's log and activity.
|
||||
mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task1) // TODO: test saved activity value
|
||||
mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task2) // TODO: test saved activity value
|
||||
logMsg := "Task was requeued by Manager because the worker assigned to it signed off."
|
||||
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg)
|
||||
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task2.UUID, logMsg)
|
||||
mf.stateMachine.EXPECT().RequeueTasksOfWorker(expectCtx, &worker, "worker signed off").Return(nil)
|
||||
|
||||
// Expect worker to be saved as 'offline'.
|
||||
mf.persistence.EXPECT().
|
||||
@ -220,10 +192,7 @@ func TestWorkerSignoffStatusChangeRequest(t *testing.T) {
|
||||
savedWorker.StatusChangeClear()
|
||||
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
||||
|
||||
// Mimick that no tasks are currently being worked on.
|
||||
mf.persistence.EXPECT().
|
||||
FetchTasksOfWorkerInStatus(gomock.Any(), &worker, api.TaskStatusActive).
|
||||
Return(nil, nil)
|
||||
mf.stateMachine.EXPECT().RequeueTasksOfWorker(gomock.Any(), &worker, "worker signed off").Return(nil)
|
||||
|
||||
// Perform the request
|
||||
echo := mf.prepareMockedRequest(nil)
|
||||
|
@ -6,15 +6,18 @@ import (
|
||||
"context"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/internal/manager/task_logs"
|
||||
"git.blender.org/flamenco/internal/manager/webupdates"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Generate mock implementations of these interfaces.
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_state_machine PersistenceService,ChangeBroadcaster
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_state_machine PersistenceService,ChangeBroadcaster,LogStorage
|
||||
|
||||
type PersistenceService interface {
|
||||
SaveTask(ctx context.Context, task *persistence.Task) error
|
||||
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
||||
SaveJobStatus(ctx context.Context, j *persistence.Job) error
|
||||
|
||||
JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error)
|
||||
@ -30,6 +33,7 @@ type PersistenceService interface {
|
||||
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error
|
||||
|
||||
FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error)
|
||||
FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error)
|
||||
}
|
||||
|
||||
// PersistenceService should be a subset of persistence.DB
|
||||
@ -45,3 +49,10 @@ type ChangeBroadcaster interface {
|
||||
|
||||
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
|
||||
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
|
||||
|
||||
// LogStorage writes to task logs.
|
||||
type LogStorage interface {
|
||||
WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error
|
||||
}
|
||||
|
||||
var _ LogStorage = (*task_logs.Storage)(nil)
|
||||
|
@ -1,5 +1,5 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: git.blender.org/flamenco/internal/manager/task_state_machine (interfaces: PersistenceService,ChangeBroadcaster)
|
||||
// Source: git.blender.org/flamenco/internal/manager/task_state_machine (interfaces: PersistenceService,ChangeBroadcaster,LogStorage)
|
||||
|
||||
// Package mocks is a generated GoMock package.
|
||||
package mocks
|
||||
@ -11,6 +11,7 @@ import (
|
||||
persistence "git.blender.org/flamenco/internal/manager/persistence"
|
||||
api "git.blender.org/flamenco/pkg/api"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
zerolog "github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// MockPersistenceService is a mock of PersistenceService interface.
|
||||
@ -77,6 +78,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{}
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...)
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatus mocks base method.
|
||||
func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].([]*persistence.Task)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatus indicates an expected call of FetchTasksOfWorkerInStatus.
|
||||
func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// JobHasTasksInStatus mocks base method.
|
||||
func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
@ -120,6 +136,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTask(arg0, arg1 interface{}) *
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1)
|
||||
}
|
||||
|
||||
// SaveTaskActivity mocks base method.
|
||||
func (m *MockPersistenceService) SaveTaskActivity(arg0 context.Context, arg1 *persistence.Task) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SaveTaskActivity", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SaveTaskActivity indicates an expected call of SaveTaskActivity.
|
||||
func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1)
|
||||
}
|
||||
|
||||
// UpdateJobsTaskStatuses mocks base method.
|
||||
func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
@ -194,3 +224,40 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskUpdate(arg0 interface{
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskUpdate), arg0)
|
||||
}
|
||||
|
||||
// MockLogStorage is a mock of LogStorage interface.
|
||||
type MockLogStorage struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockLogStorageMockRecorder
|
||||
}
|
||||
|
||||
// MockLogStorageMockRecorder is the mock recorder for MockLogStorage.
|
||||
type MockLogStorageMockRecorder struct {
|
||||
mock *MockLogStorage
|
||||
}
|
||||
|
||||
// NewMockLogStorage creates a new mock instance.
|
||||
func NewMockLogStorage(ctrl *gomock.Controller) *MockLogStorage {
|
||||
mock := &MockLogStorage{ctrl: ctrl}
|
||||
mock.recorder = &MockLogStorageMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockLogStorage) EXPECT() *MockLogStorageMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// WriteTimestamped mocks base method.
|
||||
func (m *MockLogStorage) WriteTimestamped(arg0 zerolog.Logger, arg1, arg2, arg3 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "WriteTimestamped", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// WriteTimestamped indicates an expected call of WriteTimestamped.
|
||||
func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
@ -22,13 +22,14 @@ const taskFailJobPercentage = 10 // Integer from 0 to 100.
|
||||
type StateMachine struct {
|
||||
persist PersistenceService
|
||||
broadcaster ChangeBroadcaster
|
||||
logStorage LogStorage
|
||||
}
|
||||
|
||||
|
||||
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine {
|
||||
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine {
|
||||
return &StateMachine{
|
||||
persist: persist,
|
||||
broadcaster: broadcaster,
|
||||
logStorage: logStorage,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
type StateMachineMocks struct {
|
||||
persist *mocks.MockPersistenceService
|
||||
broadcaster *mocks.MockChangeBroadcaster
|
||||
logStorage *mocks.MockLogStorage
|
||||
}
|
||||
|
||||
// In the comments below, "T" indicates the performed task status change, and
|
||||
@ -361,8 +362,9 @@ func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateM
|
||||
mocks := StateMachineMocks{
|
||||
persist: mocks.NewMockPersistenceService(mockCtrl),
|
||||
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
|
||||
logStorage: mocks.NewMockLogStorage(mockCtrl),
|
||||
}
|
||||
sm := NewStateMachine(mocks.persist, mocks.broadcaster)
|
||||
sm := NewStateMachine(mocks.persist, mocks.broadcaster, mocks.logStorage)
|
||||
return sm, &mocks
|
||||
}
|
||||
|
||||
|
62
internal/manager/task_state_machine/worker_requeue.go
Normal file
62
internal/manager/task_state_machine/worker_requeue.go
Normal file
@ -0,0 +1,62 @@
|
||||
package task_state_machine
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// RequeueTasksOfWorker re-queues all active tasks (should be max one) of this worker.
|
||||
//
|
||||
// `reason`: a string that can be appended to text like "Task requeued because "
|
||||
func (sm *StateMachine) RequeueTasksOfWorker(
|
||||
ctx context.Context,
|
||||
worker *persistence.Worker,
|
||||
reason string,
|
||||
) error {
|
||||
logger := log.With().
|
||||
Str("worker", worker.UUID).
|
||||
Logger()
|
||||
|
||||
// Fetch the tasks to update.
|
||||
tasks, err := sm.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err)
|
||||
}
|
||||
|
||||
// Run each task change through the task state machine.
|
||||
var lastErr error
|
||||
for _, task := range tasks {
|
||||
logger.Info().
|
||||
Str("task", task.UUID).
|
||||
Msg("re-queueing task")
|
||||
|
||||
// Write to task activity that it got requeued because of worker sign-off.
|
||||
task.Activity = "Task was requeued by Manager because " + reason
|
||||
if err := sm.persist.SaveTaskActivity(ctx, task); err != nil {
|
||||
logger.Warn().Err(err).
|
||||
Str("task", task.UUID).
|
||||
Str("reason", reason).
|
||||
Str("activity", task.Activity).
|
||||
Msg("error saving task activity to database")
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
if err := sm.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil {
|
||||
logger.Warn().Err(err).
|
||||
Str("task", task.UUID).
|
||||
Str("reason", reason).
|
||||
Msg("error queueing task")
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
_ = sm.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, task.Activity)
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
64
internal/manager/task_state_machine/worker_requeue_test.go
Normal file
64
internal/manager/task_state_machine/worker_requeue_test.go
Normal file
@ -0,0 +1,64 @@
|
||||
package task_state_machine
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRequeueTasksOfWorker(t *testing.T) {
|
||||
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
worker := persistence.Worker{
|
||||
UUID: "3ed470c8-d41e-4668-92d0-d799997433a4",
|
||||
Name: "testert",
|
||||
}
|
||||
|
||||
// Mock that the worker has two active tasks. It shouldn't happen, but even
|
||||
// when it does, both should be requeued when the worker signs off.
|
||||
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
|
||||
task2 := taskOfSameJob(task1, api.TaskStatusActive)
|
||||
workerTasks := []*persistence.Task{task1, task2}
|
||||
|
||||
task1PrevStatus := task1.Status
|
||||
task2PrevStatus := task2.Status
|
||||
|
||||
mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive).Return(workerTasks, nil)
|
||||
|
||||
// Expect this re-queueing to end up in the task's log and activity.
|
||||
mocks.persist.EXPECT().SaveTaskActivity(ctx, task1) // TODO: test saved activity value
|
||||
mocks.persist.EXPECT().SaveTaskActivity(ctx, task2) // TODO: test saved activity value
|
||||
mocks.persist.EXPECT().SaveTask(ctx, task1) // TODO: test saved task status
|
||||
mocks.persist.EXPECT().SaveTask(ctx, task2) // TODO: test saved task status
|
||||
|
||||
logMsg := "Task was requeued by Manager because worker had to test"
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg)
|
||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task2.Job.UUID, task2.UUID, logMsg)
|
||||
|
||||
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{
|
||||
Activity: logMsg,
|
||||
Id: task1.UUID,
|
||||
JobId: task1.Job.UUID,
|
||||
Name: task1.Name,
|
||||
PreviousStatus: &task1PrevStatus,
|
||||
Status: api.TaskStatusQueued,
|
||||
Updated: task1.UpdatedAt,
|
||||
})
|
||||
|
||||
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.SocketIOTaskUpdate{
|
||||
Activity: logMsg,
|
||||
Id: task2.UUID,
|
||||
JobId: task2.Job.UUID,
|
||||
Name: task2.Name,
|
||||
PreviousStatus: &task2PrevStatus,
|
||||
Status: api.TaskStatusQueued,
|
||||
Updated: task2.UpdatedAt,
|
||||
})
|
||||
|
||||
err := sm.RequeueTasksOfWorker(ctx, &worker, "worker had to test")
|
||||
assert.NoError(t, err)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user