diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 504ee300..a5bb4b6c 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -247,8 +247,22 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { } func (db *DB) SaveTask(ctx context.Context, t *Task) error { - if err := db.gormDB.WithContext(ctx).Save(t).Error; err != nil { - return taskError(err, "saving task") + tx := db.gormDB.WithContext(ctx). + Omit("job"). + Omit("worker"). + Save(t) + if tx.Error != nil { + return taskError(tx.Error, "saving task") + } + return nil +} + +func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error { + tx := db.gormDB.WithContext(ctx). + Select("Status"). + Save(t) + if tx.Error != nil { + return taskError(tx.Error, "saving task") } return nil } @@ -265,7 +279,9 @@ func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error { func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error { tx := db.gormDB.WithContext(ctx). - Model(t).Updates(Task{WorkerID: &w.ID}) + Model(t). + Select("WorkerID"). + Updates(Task{WorkerID: &w.ID}) if tx.Error != nil { return taskError(tx.Error, "assigning task %s to worker %s", t.UUID, w.UUID) } diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index cc5c714c..15e6af7f 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -149,5 +149,6 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error { return tx.Model(t). + Select("WorkerID", "LastTouchedAt"). Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error } diff --git a/internal/manager/task_state_machine/interfaces.go b/internal/manager/task_state_machine/interfaces.go index 39d1a954..5da02774 100644 --- a/internal/manager/task_state_machine/interfaces.go +++ b/internal/manager/task_state_machine/interfaces.go @@ -17,6 +17,7 @@ import ( type PersistenceService interface { SaveTask(ctx context.Context, task *persistence.Task) error + SaveTaskStatus(ctx context.Context, t *persistence.Task) error SaveTaskActivity(ctx context.Context, t *persistence.Task) error SaveJobStatus(ctx context.Context, j *persistence.Job) error diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 92503fe4..e288324d 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -165,6 +165,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1) } +// SaveTaskStatus mocks base method. +func (m *MockPersistenceService) SaveTaskStatus(arg0 context.Context, arg1 *persistence.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveTaskStatus", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveTaskStatus indicates an expected call of SaveTaskStatus. +func (mr *MockPersistenceServiceMockRecorder) SaveTaskStatus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskStatus), arg0, arg1) +} + // UpdateJobsTaskStatuses mocks base method. func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error { m.ctrl.T.Helper() diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 08ef5135..915a4eca 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -76,7 +76,7 @@ func (sm *StateMachine) taskStatusChangeOnly( Logger() logger.Debug().Msg("task state changed") - if err := sm.persist.SaveTask(ctx, task); err != nil { + if err := sm.persist.SaveTaskStatus(ctx, task); err != nil { return fmt.Errorf("saving task to database: %w", err) } diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index ad2898b0..403c6e91 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -373,7 +373,7 @@ func (m *StateMachineMocks) expectSaveTaskWithStatus( expectTaskStatus api.TaskStatus, ) *gomock.Call { return m.persist.EXPECT(). - SaveTask(gomock.Any(), task). + SaveTaskStatus(gomock.Any(), task). DoAndReturn(func(ctx context.Context, savedTask *persistence.Task) error { assert.Equal(t, expectTaskStatus, savedTask.Status) return nil diff --git a/internal/manager/task_state_machine/worker_requeue_test.go b/internal/manager/task_state_machine/worker_requeue_test.go index ad0070e8..0a1b4544 100644 --- a/internal/manager/task_state_machine/worker_requeue_test.go +++ b/internal/manager/task_state_machine/worker_requeue_test.go @@ -32,8 +32,8 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) { // Expect this re-queueing to end up in the task's log and activity. mocks.persist.EXPECT().SaveTaskActivity(ctx, task1) // TODO: test saved activity value mocks.persist.EXPECT().SaveTaskActivity(ctx, task2) // TODO: test saved activity value - mocks.persist.EXPECT().SaveTask(ctx, task1) // TODO: test saved task status - mocks.persist.EXPECT().SaveTask(ctx, task2) // TODO: test saved task status + mocks.persist.EXPECT().SaveTaskStatus(ctx, task1) // TODO: test saved task status + mocks.persist.EXPECT().SaveTaskStatus(ctx, task2) // TODO: test saved task status logMsg := "Task was requeued by Manager because worker had to test" mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg)