Implement mass updating of tasks when JobUpdate.refresh_tasks = true

Send & handle `JobUpdate.refresh_tasks = true` when many tasks are
updated simultaneously. This applies to things like cancelling &
requeueing an entire job.

This partially rolls back 67bf77de13d99b1bc5d7344951068822c4fadd88, as
it was too slow when 1000+ tasks were being updated all at once.
This commit is contained in:
Sybren A. Stüvel 2022-05-17 14:48:50 +02:00
parent cd8080fb44
commit 530520b1c7
5 changed files with 229 additions and 181 deletions

View File

@ -346,3 +346,42 @@ func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuse
return tasks, nil return tasks, nil
} }
// UpdateJobsTaskStatuses updates the status & activity of all tasks of `job`.
func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job,
taskStatus api.TaskStatus, activity string) error {
if taskStatus == "" {
return taskError(nil, "empty status not allowed")
}
tx := db.gormDB.WithContext(ctx).
Model(Task{}).
Where("job_Id = ?", job.ID).
Updates(Task{Status: taskStatus, Activity: activity})
if tx.Error != nil {
return taskError(tx.Error, "updating status of all tasks of job %s", job.UUID)
}
return nil
}
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
// limited to those tasks with status in `statusesToUpdate`.
func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error {
if taskStatus == "" {
return taskError(nil, "empty status not allowed")
}
tx := db.gormDB.WithContext(ctx).
Model(Task{}).
Where("job_Id = ?", job.ID).
Where("status in ?", statusesToUpdate).
Updates(Task{Status: taskStatus, Activity: activity})
if tx.Error != nil {
return taskError(tx.Error, "updating status of all tasks in status %v of job %s", statusesToUpdate, job.UUID)
}
return nil
}

View File

@ -77,41 +77,6 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsInStatus), varargs...)
} }
// FetchTasksOfJob mocks base method.
func (m *MockPersistenceService) FetchTasksOfJob(arg0 context.Context, arg1 *persistence.Job) ([]*persistence.Task, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchTasksOfJob", arg0, arg1)
ret0, _ := ret[0].([]*persistence.Task)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchTasksOfJob indicates an expected call of FetchTasksOfJob.
func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfJob(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfJob), arg0, arg1)
}
// FetchTasksOfJobInStatus mocks base method.
func (m *MockPersistenceService) FetchTasksOfJobInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 ...api.TaskStatus) ([]*persistence.Task, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "FetchTasksOfJobInStatus", varargs...)
ret0, _ := ret[0].([]*persistence.Task)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchTasksOfJobInStatus indicates an expected call of FetchTasksOfJobInStatus.
func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfJobInStatus(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfJobInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfJobInStatus), varargs...)
}
// JobHasTasksInStatus mocks base method. // JobHasTasksInStatus mocks base method.
func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) { func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -155,6 +120,34 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTask(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1)
} }
// UpdateJobsTaskStatuses mocks base method.
func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateJobsTaskStatuses", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateJobsTaskStatuses indicates an expected call of UpdateJobsTaskStatuses.
func (mr *MockPersistenceServiceMockRecorder) UpdateJobsTaskStatuses(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateJobsTaskStatuses", reflect.TypeOf((*MockPersistenceService)(nil).UpdateJobsTaskStatuses), arg0, arg1, arg2, arg3)
}
// UpdateJobsTaskStatusesConditional mocks base method.
func (m *MockPersistenceService) UpdateJobsTaskStatusesConditional(arg0 context.Context, arg1 *persistence.Job, arg2 []api.TaskStatus, arg3 api.TaskStatus, arg4 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateJobsTaskStatusesConditional", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateJobsTaskStatusesConditional indicates an expected call of UpdateJobsTaskStatusesConditional.
func (mr *MockPersistenceServiceMockRecorder) UpdateJobsTaskStatusesConditional(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateJobsTaskStatusesConditional", reflect.TypeOf((*MockPersistenceService)(nil).UpdateJobsTaskStatusesConditional), arg0, arg1, arg2, arg3, arg4)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. // MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct { type MockChangeBroadcaster struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@ -34,8 +34,14 @@ type PersistenceService interface {
JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error) JobHasTasksInStatus(ctx context.Context, job *persistence.Job, taskStatus api.TaskStatus) (bool, error)
CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error) CountTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) (numInStatus, numTotal int, err error)
FetchTasksOfJob(ctx context.Context, job *persistence.Job) ([]*persistence.Task, error) // UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`.
FetchTasksOfJobInStatus(ctx context.Context, job *persistence.Job, taskStatuses ...api.TaskStatus) ([]*persistence.Task, error) UpdateJobsTaskStatuses(ctx context.Context, job *persistence.Job,
taskStatus api.TaskStatus, activity string) error
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
// limited to those tasks with status in `statusesToUpdate`.
UpdateJobsTaskStatusesConditional(ctx context.Context, job *persistence.Job,
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error
FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error)
} }
@ -346,7 +352,7 @@ func (sm *StateMachine) jobStatusSet(ctx context.Context,
} }
// Handle the status change. // Handle the status change.
newJobStatus, err = sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus) result, err := sm.updateTasksAfterJobStatusChange(ctx, logger, job, oldJobStatus)
if err != nil { if err != nil {
return "", fmt.Errorf("updating job's tasks after job status change: %w", err) return "", fmt.Errorf("updating job's tasks after job status change: %w", err)
} }
@ -354,9 +360,21 @@ func (sm *StateMachine) jobStatusSet(ctx context.Context,
// Broadcast this change to the SocketIO clients. // Broadcast this change to the SocketIO clients.
jobUpdate := webupdates.NewJobUpdate(job) jobUpdate := webupdates.NewJobUpdate(job)
jobUpdate.PreviousStatus = &oldJobStatus jobUpdate.PreviousStatus = &oldJobStatus
jobUpdate.RefreshTasks = result.massTaskUpdate
sm.broadcaster.BroadcastJobUpdate(jobUpdate) sm.broadcaster.BroadcastJobUpdate(jobUpdate)
return newJobStatus, nil return result.followingJobStatus, nil
}
// tasksUpdateResult is returned by `updateTasksAfterJobStatusChange`.
type tasksUpdateResult struct {
// FollowingJobStatus is set when the task updates should trigger another job status update.
followingJobStatus api.JobStatus
// massTaskUpdate is true when multiple/all tasks were updated simultaneously.
// This hasn't triggered individual task updates to SocketIO clients, and thus
// the resulting SocketIO job update should indicate all tasks must be
// reloaded.
massTaskUpdate bool
} }
// updateTasksAfterJobStatusChange updates the status of its tasks based on the // updateTasksAfterJobStatusChange updates the status of its tasks based on the
@ -371,31 +389,43 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange(
logger zerolog.Logger, logger zerolog.Logger,
job *persistence.Job, job *persistence.Job,
oldJobStatus api.JobStatus, oldJobStatus api.JobStatus,
) (api.JobStatus, error) { ) (tasksUpdateResult, error) {
// Every case in this switch MUST return, for sanity sake. // Every case in this switch MUST return, for sanity sake.
switch job.Status { switch job.Status {
case api.JobStatusCompleted, api.JobStatusCanceled: case api.JobStatusCompleted, api.JobStatusCanceled:
// Nothing to do; this will happen as a response to all tasks receiving this status. // Nothing to do; this will happen as a response to all tasks receiving this status.
return "", nil return tasksUpdateResult{}, nil
case api.JobStatusActive: case api.JobStatusActive:
// Nothing to do; this happens when a task gets started, which has nothing to // Nothing to do; this happens when a task gets started, which has nothing to
// do with other tasks in the job. // do with other tasks in the job.
return "", nil return tasksUpdateResult{}, nil
case api.JobStatusCancelRequested, api.JobStatusFailed: case api.JobStatusCancelRequested, api.JobStatusFailed:
return sm.cancelTasks(ctx, logger, job) jobStatus, err := sm.cancelTasks(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusRequeued: case api.JobStatusRequeued:
return sm.requeueTasks(ctx, logger, job, oldJobStatus) jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
case api.JobStatusQueued: case api.JobStatusQueued:
return sm.checkTaskCompletion(ctx, logger, job) jobStatus, err := sm.checkTaskCompletion(ctx, logger, job)
return tasksUpdateResult{
followingJobStatus: jobStatus,
massTaskUpdate: true,
}, err
default: default:
logger.Warn().Msg("unknown job status change, ignoring") logger.Warn().Msg("unknown job status change, ignoring")
return "", nil return tasksUpdateResult{}, nil
} }
} }
@ -408,16 +438,15 @@ func (sm *StateMachine) cancelTasks(
logger.Info().Msg("cancelling tasks of job") logger.Info().Msg("cancelling tasks of job")
// Any task that is running or might run in the future should get cancelled. // Any task that is running or might run in the future should get cancelled.
tasks, err := sm.persist.FetchTasksOfJobInStatus(ctx, job, taskStatusesToCancel := []api.TaskStatus{
api.TaskStatusActive, api.TaskStatusActive,
api.TaskStatusQueued, api.TaskStatusQueued,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
)
if err != nil {
return "", err
} }
activity := fmt.Sprintf("Manager cancelled this task because the job got status %q.", job.Status) err := sm.persist.UpdateJobsTaskStatusesConditional(
err = sm.massUpdateTaskStatus(ctx, tasks, api.TaskStatusCanceled, activity) ctx, job, taskStatusesToCancel, api.TaskStatusCanceled,
fmt.Sprintf("Manager cancelled this task because the job got status %q.", job.Status),
)
if err != nil { if err != nil {
return "", fmt.Errorf("cancelling tasks of job %s: %w", job.UUID, err) return "", fmt.Errorf("cancelling tasks of job %s: %w", job.UUID, err)
} }
@ -447,7 +476,6 @@ func (sm *StateMachine) requeueTasks(
} }
var err error var err error
var tasks []*persistence.Task
switch oldJobStatus { switch oldJobStatus {
case api.JobStatusUnderConstruction: case api.JobStatusUnderConstruction:
@ -457,19 +485,22 @@ func (sm *StateMachine) requeueTasks(
return "", nil return "", nil
case api.JobStatusCompleted: case api.JobStatusCompleted:
// Re-queue all tasks. // Re-queue all tasks.
tasks, err = sm.persist.FetchTasksOfJob(ctx, job) err = sm.persist.UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
default: default:
statusesToUpdate := []api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
// Re-queue only the non-completed tasks. // Re-queue only the non-completed tasks.
tasks, err = sm.persist.FetchTasksOfJobInStatus(ctx, job, nonCompletedStatuses...) err = sm.persist.UpdateJobsTaskStatusesConditional(ctx, job,
statusesToUpdate, api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status))
} }
if err != nil { if err != nil {
return "", err return "", fmt.Errorf("queueing tasks of job %s: %w", job.UUID, err)
}
activity := fmt.Sprintf("Queued because job transitioned status from %q to %q", oldJobStatus, job.Status)
err = sm.massUpdateTaskStatus(ctx, tasks, api.TaskStatusQueued, activity)
if err != nil {
return "", err
} }
// TODO: also reset the 'failed by workers' blacklist. // TODO: also reset the 'failed by workers' blacklist.
@ -478,28 +509,6 @@ func (sm *StateMachine) requeueTasks(
return api.JobStatusQueued, nil return api.JobStatusQueued, nil
} }
// massUpdateTaskStatus updates the status of all the given tasks.
// NOTE: these task statuses do NOT affect the job status.
// Tasks that are passed in the `tasks` parameter but already have the given status will be skipped.
func (sm *StateMachine) massUpdateTaskStatus(
ctx context.Context,
tasks []*persistence.Task,
status api.TaskStatus,
activity string,
) error {
for _, task := range tasks {
if task.Status == status {
continue
}
task.Activity = activity
err := sm.taskStatusChangeOnly(ctx, task, status)
if err != nil {
return err
}
}
return nil
}
// checkTaskCompletion returns "completed" as next job status when all tasks of // checkTaskCompletion returns "completed" as next job status when all tasks of
// the job are completed. // the job are completed.
// //

View File

@ -116,28 +116,25 @@ func TestTaskStatusChangeActiveToFailedFailJob(t *testing.T) {
// T: active > failed (10% task1 failure) --> J: active > failed + cancellation of any runnable tasks. // T: active > failed (10% task1 failure) --> J: active > failed + cancellation of any runnable tasks.
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive) task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusFailed)
task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
remainingTasks := []*persistence.Task{task2, task3}
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusFailed) mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusFailed)
// The change to the failed task should be broadcast.
mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusFailed) mocks.expectBroadcastTaskChange(task1, api.TaskStatusActive, api.TaskStatusFailed)
mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusFailed) mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusFailed)
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusActive, api.JobStatusFailed) // The resulting cancellation of the other tasks should be communicated as mass-task-update in the job update broadcast.
mocks.expectBroadcastJobChangeWithTaskRefresh(task1.Job, api.JobStatusActive, api.JobStatusFailed)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task1.Job, api.TaskStatusFailed).Return(10, 100, nil) // 10 out of 100 failed. mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task1.Job, api.TaskStatusFailed).Return(10, 100, nil) // 10 out of 100 failed.
// Expect failure of the job to trigger cancellation of remaining tasks. // Expect failure of the job to trigger cancellation of remaining tasks.
mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, task1.Job, taskStatusesToCancel := []api.TaskStatus{
api.TaskStatusActive, api.TaskStatusActive,
api.TaskStatusQueued, api.TaskStatusQueued,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
).Return(remainingTasks, nil) }
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled)
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusCanceled)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusCanceled) mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, task1.Job, taskStatusesToCancel, api.TaskStatusCanceled,
mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusCanceled) "Manager cancelled this task because the job got status \"failed\".",
)
assert.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusFailed)) assert.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusFailed))
} }
@ -147,30 +144,22 @@ func TestTaskStatusChangeRequeueOnCompletedJob(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
// T: completed > queued --> J: completed > requeued > queued // T: completed > queued --> J: completed > requeued > queued
task1 := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted) task := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted) mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusCompleted) mocks.expectBroadcastTaskChange(task, api.TaskStatusCompleted, api.TaskStatusQueued)
allTaskIDs := []*persistence.Task{task1, task2, task3} mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusRequeued)
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusCompleted, api.JobStatusRequeued)
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusQueued) mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusRequeued, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task1, api.TaskStatusCompleted, api.TaskStatusQueued)
mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusCompleted, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusRequeued, api.JobStatusQueued)
// Expect queueing of the job to trigger queueing of all its tasks, if those tasks were all completed before. // Expect queueing of the job to trigger queueing of all its tasks, if those tasks were all completed before.
// 2 out of 3 completed, because one was just queued. // 2 out of 3 completed, because one was just queued.
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task1.Job, api.TaskStatusCompleted).Return(2, 3, nil) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(2, 3, nil)
fetchCall := mocks.persist.EXPECT().FetchTasksOfJob(ctx, task1.Job).Return(allTaskIDs, nil) mocks.persist.EXPECT().UpdateJobsTaskStatuses(ctx, task.Job, api.TaskStatusQueued,
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued).After(fetchCall) "Queued because job transitioned status from \"completed\" to \"requeued\"",
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued).After(fetchCall) )
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusCompleted, api.TaskStatusQueued) assert.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusQueued))
mocks.expectBroadcastTaskChange(task3, api.TaskStatusCompleted, api.TaskStatusQueued)
mocks.expectSaveJobWithStatus(t, task1.Job, api.JobStatusQueued)
assert.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusQueued))
} }
func TestTaskStatusChangeCancelSingleTask(t *testing.T) { func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
@ -241,31 +230,31 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted) task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusFailed) // These are not necessary to create for this test, but just imagine these tasks are there too.
task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed) // This is mimicked by returning (1, 3, nil) when counting the tasks (1 of 3 completed).
notCompleteTasks := []*persistence.Task{task2, task3} // task2 := taskOfSameJob(task1, api.TaskStatusFailed)
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
job := task1.Job job := task1.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued)
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks. // Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job, mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusCanceled, api.TaskStatusCanceled,
api.TaskStatusFailed, api.TaskStatusFailed,
api.TaskStatusPaused, api.TaskStatusPaused,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
).Return(notCompleteTasks, nil) },
api.TaskStatusQueued,
"Queued because job transitioned status from \"active\" to \"requeued\"",
)
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued)
mocks.expectBroadcastJobChange(job, api.JobStatusActive, api.JobStatusRequeued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusQueued)
mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusQueued)
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unittest")) assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unittest"))
} }
@ -275,35 +264,29 @@ func TestJobRequeueWithAllCompletedTasks(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted) task1 := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted) // These are not necessary to create for this test, but just imagine these tasks are there too.
task3 := taskOfSameJob(task2, api.TaskStatusCompleted) // This is mimicked by returning (3, 3, nil) when counting the tasks (3 of 3 completed).
allTasks := []*persistence.Task{task1, task2, task3} // task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
// task3 := taskOfSameJob(task2, api.TaskStatusCompleted)
job := task1.Job job := task1.Job
call1 := mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) call1 := mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued)
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks. // Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
fetchCall := mocks.persist.EXPECT().FetchTasksOfJob(ctx, job). updateCall := mocks.persist.EXPECT().
Return(allTasks, nil). UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
"Queued because job transitioned status from \"completed\" to \"requeued\"").
After(call1) After(call1)
mocks.expectSaveTaskWithStatus(t, task1, api.TaskStatusQueued).After(fetchCall) saveJobCall := mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued).After(updateCall)
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued).After(fetchCall)
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued).After(fetchCall)
saveJobCall := mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued).After(fetchCall)
mocks.persist.EXPECT(). mocks.persist.EXPECT().
CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted). CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).
Return(0, 3, nil). // By now all tasks are queued. Return(0, 3, nil). // By now all tasks are queued.
After(saveJobCall) After(saveJobCall)
mocks.expectBroadcastJobChange(job, api.JobStatusCompleted, api.JobStatusRequeued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusCompleted, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task1, api.TaskStatusCompleted, api.TaskStatusQueued)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusCompleted, api.TaskStatusQueued)
mocks.expectBroadcastTaskChange(task3, api.TaskStatusCompleted, api.TaskStatusQueued)
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unit test")) assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unit test"))
} }
@ -313,29 +296,29 @@ func TestJobCancelWithSomeCompletedTasks(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted) task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusFailed) // task2 := taskOfSameJob(task1, api.TaskStatusFailed)
task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed) // task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
job := task1.Job job := task1.Job
potentialRunTasks := []*persistence.Task{task2, task3}
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCancelRequested) mocks.expectSaveJobWithStatus(t, job, api.JobStatusCancelRequested)
// Expect cancelling of the job to trigger cancelling of all its could-potentially-still-run tasks. // Expect cancelling of the job to trigger cancelling of all its could-potentially-still-run tasks.
fetchCall := mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job, mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusActive, api.TaskStatusActive,
api.TaskStatusQueued, api.TaskStatusQueued,
// TODO: add api.TaskStatusPaused as well, as those should get cancelled too,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
).Return(potentialRunTasks, nil) },
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusCanceled).After(fetchCall) api.TaskStatusCanceled,
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusCanceled).After(fetchCall) "Manager cancelled this task because the job got status \"cancel-requested\".",
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled).After(fetchCall) )
mocks.expectBroadcastJobChange(job, api.JobStatusActive, api.JobStatusCancelRequested) mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusCancelRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusCancelRequested, api.JobStatusCanceled) mocks.expectBroadcastJobChange(job, api.JobStatusCancelRequested, api.JobStatusCanceled)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusCanceled)
mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusCanceled)
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest")) assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest"))
} }
@ -344,32 +327,32 @@ func TestCheckStuck(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted) task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusFailed) // task2 := taskOfSameJob(task1, api.TaskStatusFailed)
task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed) // task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
job := task1.Job job := task1.Job
job.Status = api.JobStatusRequeued job.Status = api.JobStatusRequeued
mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued). mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued).
Return([]*persistence.Job{job}, nil) Return([]*persistence.Job{job}, nil)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(2, 3, nil) mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
mocks.persist.EXPECT().FetchTasksOfJobInStatus(ctx, job,
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusCanceled, api.TaskStatusCanceled,
api.TaskStatusFailed, api.TaskStatusFailed,
api.TaskStatusPaused, api.TaskStatusPaused,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
). },
Return([]*persistence.Task{task2, task3}, nil) api.TaskStatusQueued,
fmt.Sprintf("Queued because job transitioned status from %q to %q", job.Status, job.Status),
)
// Expect Job -> Queued and non-completed tasks -> Queued. // Expect Job -> Queued and non-completed tasks -> Queued.
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) // should be called once for the current status mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) // should be called once for the current status
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status
mocks.expectSaveTaskWithStatus(t, task2, api.TaskStatusQueued)
mocks.expectSaveTaskWithStatus(t, task3, api.TaskStatusQueued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusRequeued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusRequeued)
mocks.expectBroadcastJobChange(job, api.JobStatusRequeued, api.JobStatusQueued) mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
mocks.expectBroadcastTaskChange(task2, api.TaskStatusFailed, api.TaskStatusQueued)
mocks.expectBroadcastTaskChange(task3, api.TaskStatusSoftFailed, api.TaskStatusQueued)
sm.CheckStuck(ctx) sm.CheckStuck(ctx)
} }
@ -416,9 +399,25 @@ func (m *StateMachineMocks) expectBroadcastJobChange(
expectUpdate := api.JobUpdate{ expectUpdate := api.JobUpdate{
Id: job.UUID, Id: job.UUID,
Name: &job.Name, Name: &job.Name,
Updated: job.UpdatedAt,
PreviousStatus: &fromStatus, PreviousStatus: &fromStatus,
RefreshTasks: false,
Status: toStatus, Status: toStatus,
Updated: job.UpdatedAt,
}
return m.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate)
}
func (m *StateMachineMocks) expectBroadcastJobChangeWithTaskRefresh(
job *persistence.Job,
fromStatus, toStatus api.JobStatus,
) *gomock.Call {
expectUpdate := api.JobUpdate{
Id: job.UUID,
Name: &job.Name,
PreviousStatus: &fromStatus,
RefreshTasks: true,
Status: toStatus,
Updated: job.UpdatedAt,
} }
return m.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate) return m.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate)
} }

View File

@ -95,14 +95,22 @@ export default {
// SocketIO data event handlers: // SocketIO data event handlers:
onSioJobUpdate(jobUpdate) { onSioJobUpdate(jobUpdate) {
console.log("job update", jobUpdate);
if (this.$refs.jobsTable) { if (this.$refs.jobsTable) {
if (jobUpdate.previous_status) if (jobUpdate.previous_status)
this.$refs.jobsTable.processJobUpdate(jobUpdate); this.$refs.jobsTable.processJobUpdate(jobUpdate);
else else
this.$refs.jobsTable.processNewJob(jobUpdate); this.$refs.jobsTable.processNewJob(jobUpdate);
} }
if (this.jobID == jobUpdate.id) if (this.jobID != jobUpdate.id)
return;
this._fetchJob(this.jobID); this._fetchJob(this.jobID);
if (jobUpdate.refresh_tasks) {
if (this.$refs.tasksTable)
this.$refs.tasksTable.fetchTasks();
this._fetchTask(this.taskID);
}
}, },
/** /**