flamenco/internal/manager/task_state_machine/task_state_machine_test.go
Sybren A. Stüvel 531a0184f7 Transition from ex-GORM structs to sqlc structs (5/5)
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated
structs. This also makes it possible to use more specific structs that
are more taylored to the specific queries, increasing efficiency.

This commit deals with the remaining areas, like the job deleter, task
timeout checker, and task state machine. And anything else to get things
running again.

Functional changes are kept to a minimum, as the API still serves the
same data.

Because this work covers so much of Flamenco's code, it's been split up
into different commits. Each commit brings Flamenco to a state where it
compiles and unit tests pass. Only the result of the final commit has
actually been tested properly.

Ref: #104343
2024-12-04 14:00:22 +01:00

609 lines
25 KiB
Go

package task_state_machine
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"fmt"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine/mocks"
"projects.blender.org/studio/flamenco/pkg/api"
)
type StateMachineMocks struct {
persist *mocks.MockPersistenceService
broadcaster *mocks.MockChangeBroadcaster
logStorage *mocks.MockLogStorage
}
// In the comments below, "T" indicates the performed task status change, and
// "J" the expected resulting job status change.
func TestTaskStatusChangeQueuedToActive(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// T: queued > active --> J: queued > active
task, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive)
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status queued -> active")
mocks.expectSaveJobWithStatus(t, job, api.JobStatusActive)
mocks.expectBroadcastJobChange(job, api.JobStatusQueued, api.JobStatusActive)
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusQueued, api.TaskStatusActive)
mocks.expectFetchJobOfTask(task, job)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusActive))
}
func TestTaskStatusChangeSaveTaskAfterJobChangeFailure(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// A task status change should be saved, even when triggering the job change errors somehow.
task, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
jobSaveErr := errors.New("hypothetical job save error")
mocks.persist.EXPECT().
SaveJobStatus(gomock.Any(), job).
Return(jobSaveErr)
// Expect a call to save the task in the persistence layer, regardless of the above error.
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusActive)
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status queued -> active")
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusQueued, api.TaskStatusActive)
mocks.expectFetchJobOfTask(task, job)
returnedErr := sm.TaskStatusChange(ctx, task, api.TaskStatusActive)
assert.ErrorIs(t, returnedErr, jobSaveErr, "the returned error should wrap the persistence layer error")
}
func TestTaskStatusChangeActiveToCompleted(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// Job has three tasks.
task1, job := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusActive)
task3 := taskOfSameJob(task1, api.TaskStatusActive)
// First task completing: T: active > completed --> J: active > active
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCompleted)
mocks.expectWriteTaskLogTimestamped(t, task1, job.UUID, "task changed status active -> completed")
mocks.expectBroadcastTaskChange(task1, job.UUID, api.TaskStatusActive, api.TaskStatusCompleted)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil) // 1 of 3 complete.
mocks.expectFetchJobOfTask(task1, job)
require.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCompleted))
// Second task hickup: T: active > soft-failed --> J: active > active
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusSoftFailed)
mocks.expectWriteTaskLogTimestamped(t, task2, job.UUID, "task changed status active -> soft-failed")
mocks.expectBroadcastTaskChange(task2, job.UUID, api.TaskStatusActive, api.TaskStatusSoftFailed)
mocks.expectFetchJobOfTask(task2, job)
require.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusSoftFailed))
// Second task completing: T: soft-failed > completed --> J: active > active
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCompleted)
mocks.expectWriteTaskLogTimestamped(t, task2, job.UUID, "task changed status soft-failed -> completed")
mocks.expectBroadcastTaskChange(task2, job.UUID, api.TaskStatusSoftFailed, api.TaskStatusCompleted)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(2, 3, nil) // 2 of 3 complete.
mocks.expectFetchJobOfTask(task2, job)
require.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCompleted))
// Third task completing: T: active > completed --> J: active > completed
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusCompleted)
mocks.expectWriteTaskLogTimestamped(t, task3, job.UUID, "task changed status active -> completed")
mocks.expectBroadcastTaskChange(task3, job.UUID, api.TaskStatusActive, api.TaskStatusCompleted)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(3, 3, nil) // 3 of 3 complete.
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCompleted)
mocks.expectBroadcastJobChange(job, api.JobStatusActive, api.JobStatusCompleted)
mocks.expectFetchJobOfTask(task3, job)
require.NoError(t, sm.TaskStatusChange(ctx, task3, api.TaskStatusCompleted))
}
func TestTaskStatusChangeQueuedToFailed(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// T: queued > failed (1% task failure) --> J: queued > active
task, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusFailed)
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status queued -> failed")
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusQueued, api.TaskStatusFailed)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusActive)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed).Return(1, 100, nil) // 1 out of 100 failed.
mocks.expectBroadcastJobChange(job, api.JobStatusQueued, api.JobStatusActive)
mocks.expectFetchJobOfTask(task, job)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusFailed))
}
func TestTaskStatusChangeActiveToFailedFailJob(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// T: active > failed (10% task1 failure) --> J: active > failed + cancellation of any runnable tasks.
task1, job := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusFailed)
mocks.expectWriteTaskLogTimestamped(t, task1, job.UUID, "task changed status active -> failed")
// The change to the failed task should be broadcast.
mocks.expectBroadcastTaskChange(task1, job.UUID, api.TaskStatusActive, api.TaskStatusFailed)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusFailed)
// The resulting cancellation of the other tasks should be communicated as mass-task-update in the job update broadcast.
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusFailed)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed).Return(10, 100, nil) // 10 out of 100 failed.
// Expect failure of the job to trigger cancellation of remaining tasks.
taskStatusesToCancel := []api.TaskStatus{
api.TaskStatusActive,
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
}
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job, taskStatusesToCancel, api.TaskStatusCanceled,
"Manager cancelled this task because the job got status \"failed\".",
)
mocks.expectFetchJobOfTask(task1, job)
require.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusFailed))
}
func TestTaskStatusChangeRequeueOnCompletedJob(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// T: completed > queued --> J: completed > requeueing > queued
task, job := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusQueued)
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status completed -> queued")
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusCompleted, api.TaskStatusQueued)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusCompleted, api.JobStatusRequeueing)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
// Expect queueing of the job to trigger queueing of all its tasks, if those tasks were all completed before.
// 2 out of 3 completed, because one was just queued.
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(2, 3, nil)
mocks.persist.EXPECT().UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
"Queued because job transitioned status from \"completed\" to \"requeueing\"",
)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued)
mocks.expectFetchJobOfTask(task, job)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusQueued))
}
func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task, job := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive)
task2 := taskOfSameJob(task, api.TaskStatusQueued)
// T1: active > cancelled --> J: cancel-requested > cancel-requested
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusCanceled)
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status active -> canceled")
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusActive, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(1, 2, nil)
mocks.expectFetchJobOfTask(task, job)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled))
// T2: queued > cancelled --> J: cancel-requested > canceled
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled)
mocks.expectWriteTaskLogTimestamped(t, task2, job.UUID, "task changed status queued -> canceled")
mocks.expectBroadcastTaskChange(task2, job.UUID, api.TaskStatusQueued, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(0, 2, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChange(job, api.JobStatusCancelRequested, api.JobStatusCanceled)
mocks.expectFetchJobOfTask(task2, job)
require.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCanceled))
}
func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1, job := taskWithStatus(api.JobStatusCancelRequested, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusFailed)
taskOfSameJob(task2, api.TaskStatusPaused)
// T1: active > cancelled --> J: cancel-requested > canceled because T2 already failed and cannot run anyway.
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusCanceled)
mocks.expectWriteTaskLogTimestamped(t, task1, job.UUID, "task changed status active -> canceled")
mocks.expectBroadcastTaskChange(task1, job.UUID, api.TaskStatusActive, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChange(job, api.JobStatusCancelRequested, api.JobStatusCanceled)
mocks.expectFetchJobOfTask(task1, job)
// The canceled task just stays canceled, so don't expectBroadcastTaskChange(task3).
require.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCanceled))
}
func TestTaskStatusChangeUnknownStatus(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
// T: queued > borked --> saved to DB but otherwise ignored w.r.t. job status changes.
task, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatus("borked"))
mocks.expectWriteTaskLogTimestamped(t, task, job.UUID, "task changed status queued -> borked")
mocks.expectBroadcastTaskChange(task, job.UUID, api.TaskStatusQueued, api.TaskStatus("borked"))
mocks.expectFetchJobOfTask(task, job)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatus("borked")))
}
func TestJobRequeueWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
_, job := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
// These are not necessary to create for this test, but just imagine these tasks are there too.
// This is mimicked by returning (1, 3, nil) when counting the tasks (1 of 3 completed).
// task2 := taskOfSameJob(task1, api.TaskStatusFailed)
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing)
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
},
api.TaskStatusQueued,
"Queued because job transitioned status from \"active\" to \"requeueing\"",
)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusRequeueing)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusRequeueing, "someone wrote a unittest"))
}
func TestJobRequeueWithAllCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
_, job := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
// These are not necessary to create for this test, but just imagine these tasks are there too.
// This is mimicked by returning (3, 3, nil) when counting the tasks (3 of 3 completed).
// task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
// task3 := taskOfSameJob(task2, api.TaskStatusCompleted)
call1 := mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing)
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
updateCall := mocks.persist.EXPECT().
UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
"Queued because job transitioned status from \"completed\" to \"requeueing\"").
After(call1)
saveJobCall := mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued).After(updateCall)
mocks.persist.EXPECT().
CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).
Return(0, 3, nil). // By now all tasks are queued.
After(saveJobCall)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusCompleted, api.JobStatusRequeueing)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusRequeueing, "someone wrote a unit test"))
}
func TestJobCancelWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
_, job := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
// task2 := taskOfSameJob(task1, api.TaskStatusFailed)
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCancelRequested)
// Expect cancelling of the job to trigger cancelling of all its could-potentially-still-run tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusActive,
api.TaskStatusQueued,
// TODO: add api.TaskStatusPaused as well, as those should get cancelled too,
api.TaskStatusSoftFailed,
},
api.TaskStatusCanceled,
"Manager cancelled this task because the job got status \"cancel-requested\".",
)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusCancelRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusCancelRequested, api.JobStatusCanceled)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest"))
}
func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
_ = task3
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1, job := taskWithStatus(api.JobStatusQueued, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
_ = task3
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeActiveTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1, job := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
_ = task3
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(1, 3, nil)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
require.NoError(t, sm.jobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestCheckStuck(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1, job := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
_ = task1
// task2 := taskOfSameJob(task1, api.TaskStatusFailed)
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
job.Status = api.JobStatusRequeueing
mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing).
Return([]*persistence.Job{job}, nil)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
},
api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", job.Status, job.Status),
)
// Expect Job -> Queued and non-completed tasks -> Queued.
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing) // should be called once for the current status
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusRequeueing)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
sm.CheckStuck(ctx)
}
func mockedTaskStateMachine(mockCtrl *gomock.Controller) (*StateMachine, *StateMachineMocks) {
mocks := StateMachineMocks{
persist: mocks.NewMockPersistenceService(mockCtrl),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
logStorage: mocks.NewMockLogStorage(mockCtrl),
}
sm := NewStateMachine(mocks.persist, mocks.broadcaster, mocks.logStorage)
return sm, &mocks
}
func (m *StateMachineMocks) expectFetchJobOfTask(
task *persistence.Task,
jobToReturn *persistence.Job,
) *gomock.Call {
return m.persist.EXPECT().FetchJobByID(gomock.Any(), task.JobID).Return(jobToReturn, nil)
}
func (m *StateMachineMocks) expectSaveTaskWithStatus(
t *testing.T,
task *persistence.Task,
expectTaskStatus api.TaskStatus,
) *gomock.Call {
return m.persist.EXPECT().
SaveTaskStatus(gomock.Any(), task).
DoAndReturn(func(ctx context.Context, savedTask *persistence.Task) error {
assert.Equal(t, expectTaskStatus, savedTask.Status)
return nil
})
}
func (m *StateMachineMocks) expectWriteTaskLogTimestamped(
t *testing.T,
task *persistence.Task,
jobUUID string,
logtext string,
) *gomock.Call {
return m.logStorage.EXPECT().WriteTimestamped(
gomock.Any(), jobUUID, task.UUID, logtext,
)
}
func (m *StateMachineMocks) expectSaveJobWithStatus(
t *testing.T,
job *persistence.Job,
expectJobStatus api.JobStatus,
) *gomock.Call {
return m.persist.EXPECT().
SaveJobStatus(gomock.Any(), job).
DoAndReturn(func(ctx context.Context, savedJob *persistence.Job) error {
assert.Equal(t, expectJobStatus, savedJob.Status)
return nil
})
}
func (m *StateMachineMocks) expectBroadcastJobChange(
job *persistence.Job,
fromStatus, toStatus api.JobStatus,
) *gomock.Call {
expectUpdate := api.EventJobUpdate{
Id: job.UUID,
Name: &job.Name,
PreviousStatus: &fromStatus,
RefreshTasks: false,
Status: toStatus,
Updated: job.UpdatedAt.Time,
}
return m.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate)
}
func (m *StateMachineMocks) expectBroadcastJobChangeWithTaskRefresh(
job *persistence.Job,
fromStatus, toStatus api.JobStatus,
) *gomock.Call {
expectUpdate := api.EventJobUpdate{
Id: job.UUID,
Name: &job.Name,
PreviousStatus: &fromStatus,
RefreshTasks: true,
Status: toStatus,
Updated: job.UpdatedAt.Time,
}
return m.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate)
}
func (m *StateMachineMocks) expectBroadcastTaskChange(
task *persistence.Task,
jobUUID string,
fromStatus, toStatus api.TaskStatus,
) *gomock.Call {
expectUpdate := api.EventTaskUpdate{
Id: task.UUID,
JobId: jobUUID,
Name: task.Name,
Updated: task.UpdatedAt.Time,
PreviousStatus: &fromStatus,
Status: toStatus,
}
return m.broadcaster.EXPECT().BroadcastTaskUpdate(expectUpdate)
}
/* taskWithStatus() creates a task of a certain status, with a job of a certain status. */
func taskWithStatus(jobStatus api.JobStatus, taskStatus api.TaskStatus) (*persistence.Task, *persistence.Job) {
job := persistence.Job{
ID: 47,
UUID: "test-job-f3f5-4cef-9cd7-e67eb28eaf3e",
Status: jobStatus,
}
task := persistence.Task{
ID: 327,
JobID: job.ID,
UUID: "testtask-0001-4e28-aeea-8cbaf2fc96a5",
Status: taskStatus,
}
return &task, &job
}
/* taskOfSameJob() creates a task of a certain status, on the same job as the given task. */
func taskOfSameJob(task *persistence.Task, taskStatus api.TaskStatus) *persistence.Task {
newTaskID := task.ID + 1
return &persistence.Task{
ID: newTaskID,
UUID: fmt.Sprintf("testtask-%04d-4e28-aeea-8cbaf2fc96a5", newTaskID),
JobID: task.JobID,
Status: taskStatus,
}
}
func taskStateMachineTestFixtures(t *testing.T) (*gomock.Controller, context.Context, *StateMachine, *StateMachineMocks) {
mockCtrl := gomock.NewController(t)
ctx := context.Background()
sm, mocks := mockedTaskStateMachine(mockCtrl)
return mockCtrl, ctx, sm, mocks
}