
When a Worker would sign off while working on a task, and that task's job would be in `pause-requested` state, it would always re-queue the task. This in turn would not get detected, which caused the job to get stuck. Now tasks correctly go to `paused` when a Worker signs off, and subsequently the job will be re-checked and go to `paused` when possible as well. Note that this does not handle already-stuck jobs. That'll be for the following commit.
156 lines
5.8 KiB
Go
156 lines
5.8 KiB
Go
package task_state_machine
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/golang/mock/gomock"
|
|
"github.com/stretchr/testify/require"
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
func TestRequeueActiveTasksOfWorker(t *testing.T) {
|
|
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
|
|
defer mockCtrl.Finish()
|
|
|
|
worker := persistence.Worker{
|
|
UUID: "3ed470c8-d41e-4668-92d0-d799997433a4",
|
|
Name: "testert",
|
|
}
|
|
|
|
// Mock that the worker has two active tasks. It shouldn't happen, but even
|
|
// when it does, both should be requeued when the worker signs off.
|
|
task1, job := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
|
|
task2 := taskOfSameJob(task1, api.TaskStatusActive)
|
|
workerTasks := []persistence.TaskJob{
|
|
{Task: *task1, JobUUID: job.UUID},
|
|
{Task: *task2, JobUUID: job.UUID},
|
|
}
|
|
|
|
task1PrevStatus := task1.Status
|
|
task2PrevStatus := task2.Status
|
|
|
|
mocks.persist.EXPECT().FetchJob(ctx, job.UUID).Return(job, nil).Times(len(workerTasks))
|
|
mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive).Return(workerTasks, nil)
|
|
|
|
// Expect this re-queueing to end up in the task's log and activity.
|
|
logMsg1 := "task changed status active -> queued"
|
|
logMsg2 := "Task was queued by Manager because worker had to test"
|
|
task1WithActivity := *task1
|
|
task1WithActivity.Activity = logMsg2
|
|
task2WithActivity := *task2
|
|
task2WithActivity.Activity = logMsg2
|
|
task1WithActivityAndStatus := task1WithActivity
|
|
task1WithActivityAndStatus.Status = api.TaskStatusQueued
|
|
task2WithActivityAndStatus := task2WithActivity
|
|
task2WithActivityAndStatus.Status = api.TaskStatusQueued
|
|
mocks.persist.EXPECT().SaveTaskActivity(ctx, &task1WithActivity)
|
|
mocks.persist.EXPECT().SaveTaskActivity(ctx, &task2WithActivity)
|
|
mocks.persist.EXPECT().SaveTaskStatus(ctx, &task1WithActivityAndStatus)
|
|
mocks.persist.EXPECT().SaveTaskStatus(ctx, &task2WithActivityAndStatus)
|
|
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg1)
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task2.UUID, logMsg1)
|
|
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg2)
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task2.UUID, logMsg2)
|
|
|
|
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.EventTaskUpdate{
|
|
Activity: logMsg2,
|
|
Id: task1.UUID,
|
|
JobId: job.UUID,
|
|
Name: task1.Name,
|
|
PreviousStatus: &task1PrevStatus,
|
|
Status: api.TaskStatusQueued,
|
|
Updated: task1.UpdatedAt.Time,
|
|
})
|
|
|
|
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.EventTaskUpdate{
|
|
Activity: logMsg2,
|
|
Id: task2.UUID,
|
|
JobId: job.UUID,
|
|
Name: task2.Name,
|
|
PreviousStatus: &task2PrevStatus,
|
|
Status: api.TaskStatusQueued,
|
|
Updated: task2.UpdatedAt.Time,
|
|
})
|
|
|
|
mocks.expectFetchJobOfTask(task1, job)
|
|
mocks.expectFetchJobOfTask(task2, job)
|
|
|
|
err := sm.RequeueActiveTasksOfWorker(ctx, &worker, "worker had to test")
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Check that a Worker's task goes to 'paused' when the worker signs off and the
|
|
// job is in 'pause-requested' state. This should also ensure that, if this
|
|
// happens to be the last-active task of the job, that the entire job goes to
|
|
// `paused` state.
|
|
func TestPauseActiveTasksOfWorker(t *testing.T) {
|
|
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
|
|
defer mockCtrl.Finish()
|
|
|
|
worker := persistence.Worker{
|
|
UUID: "3ed470c8-d41e-4668-92d0-d799997433a4",
|
|
Name: "testert",
|
|
}
|
|
|
|
// Mock that the job is in pause-requested, with one paused task and one active task.
|
|
task1, job := taskWithStatus(api.JobStatusPauseRequested, api.TaskStatusActive)
|
|
taskOfSameJob(task1, api.TaskStatusPaused) // Create a 2nd, already-paused task.
|
|
|
|
jobPrevStatus := job.Status
|
|
task1PrevStatus := task1.Status
|
|
|
|
mocks.persist.EXPECT().FetchJob(ctx, job.UUID).Return(job, nil)
|
|
mocks.persist.EXPECT().FetchTasksOfWorkerInStatus(ctx, &worker, api.TaskStatusActive).
|
|
Return([]persistence.TaskJob{{Task: *task1, JobUUID: job.UUID}}, nil)
|
|
|
|
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive).Return(0, 2, nil) // 0 of 2 active.
|
|
|
|
// Expect this re-queueing to end up in the task's log and activity.
|
|
logMsg1 := "task changed status active -> paused"
|
|
logMsg2 := "Task was paused by Manager because worker had to test"
|
|
task1WithActivity := *task1
|
|
task1WithActivity.Activity = logMsg2
|
|
task1WithActivityAndStatus := task1WithActivity
|
|
task1WithActivityAndStatus.Status = api.TaskStatusPaused
|
|
mocks.persist.EXPECT().SaveTaskActivity(ctx, &task1WithActivity)
|
|
mocks.persist.EXPECT().SaveTaskStatus(ctx, &task1WithActivityAndStatus)
|
|
|
|
jobWithStatus := *job
|
|
jobWithStatus.Status = api.JobStatusPaused
|
|
jobWithStatus.Activity = "Changed to status \"paused\": last task got paused"
|
|
mocks.persist.EXPECT().SaveJobStatus(ctx, &jobWithStatus)
|
|
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg1)
|
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg2)
|
|
|
|
mocks.broadcaster.EXPECT().BroadcastTaskUpdate(api.EventTaskUpdate{
|
|
Activity: logMsg2,
|
|
Id: task1.UUID,
|
|
JobId: job.UUID,
|
|
Name: task1.Name,
|
|
PreviousStatus: &task1PrevStatus,
|
|
Status: api.TaskStatusPaused,
|
|
Updated: task1.UpdatedAt.Time,
|
|
})
|
|
mocks.broadcaster.EXPECT().BroadcastJobUpdate(api.EventJobUpdate{
|
|
Id: job.UUID,
|
|
Name: &job.Name,
|
|
PreviousStatus: &jobPrevStatus,
|
|
Priority: int(job.Priority),
|
|
RefreshTasks: false,
|
|
Status: api.JobStatusPaused,
|
|
Type: job.JobType,
|
|
Updated: job.UpdatedAt.Time,
|
|
})
|
|
|
|
mocks.expectFetchJobOfTask(task1, job)
|
|
|
|
err := sm.RequeueActiveTasksOfWorker(ctx, &worker, "worker had to test")
|
|
require.NoError(t, err)
|
|
}
|