Manager: more efficient database queries
Be more selective in what's saved to the database to speed some things up. Most importantly, this avoids saving the entire job when a task status is updated or a task is assigned.
This commit is contained in:
parent
be77403114
commit
1fceae3604
@ -247,8 +247,22 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
||||||
if err := db.gormDB.WithContext(ctx).Save(t).Error; err != nil {
|
tx := db.gormDB.WithContext(ctx).
|
||||||
return taskError(err, "saving task")
|
Omit("job").
|
||||||
|
Omit("worker").
|
||||||
|
Save(t)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return taskError(tx.Error, "saving task")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error {
|
||||||
|
tx := db.gormDB.WithContext(ctx).
|
||||||
|
Select("Status").
|
||||||
|
Save(t)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return taskError(tx.Error, "saving task")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -265,7 +279,9 @@ func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
|
|||||||
|
|
||||||
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
|
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
|
||||||
tx := db.gormDB.WithContext(ctx).
|
tx := db.gormDB.WithContext(ctx).
|
||||||
Model(t).Updates(Task{WorkerID: &w.ID})
|
Model(t).
|
||||||
|
Select("WorkerID").
|
||||||
|
Updates(Task{WorkerID: &w.ID})
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
return taskError(tx.Error, "assigning task %s to worker %s", t.UUID, w.UUID)
|
return taskError(tx.Error, "assigning task %s to worker %s", t.UUID, w.UUID)
|
||||||
}
|
}
|
||||||
|
@ -149,5 +149,6 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
|
|||||||
|
|
||||||
func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
|
func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
|
||||||
return tx.Model(t).
|
return tx.Model(t).
|
||||||
|
Select("WorkerID", "LastTouchedAt").
|
||||||
Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error
|
Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
type PersistenceService interface {
|
type PersistenceService interface {
|
||||||
SaveTask(ctx context.Context, task *persistence.Task) error
|
SaveTask(ctx context.Context, task *persistence.Task) error
|
||||||
|
SaveTaskStatus(ctx context.Context, t *persistence.Task) error
|
||||||
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
||||||
SaveJobStatus(ctx context.Context, j *persistence.Job) error
|
SaveJobStatus(ctx context.Context, j *persistence.Job) error
|
||||||
|
|
||||||
|
@ -165,6 +165,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interf
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveTaskStatus mocks base method.
|
||||||
|
func (m *MockPersistenceService) SaveTaskStatus(arg0 context.Context, arg1 *persistence.Task) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "SaveTaskStatus", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// SaveTaskStatus indicates an expected call of SaveTaskStatus.
|
||||||
|
func (mr *MockPersistenceServiceMockRecorder) SaveTaskStatus(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskStatus), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateJobsTaskStatuses mocks base method.
|
// UpdateJobsTaskStatuses mocks base method.
|
||||||
func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error {
|
func (m *MockPersistenceService) UpdateJobsTaskStatuses(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus, arg3 string) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
@ -76,7 +76,7 @@ func (sm *StateMachine) taskStatusChangeOnly(
|
|||||||
Logger()
|
Logger()
|
||||||
logger.Debug().Msg("task state changed")
|
logger.Debug().Msg("task state changed")
|
||||||
|
|
||||||
if err := sm.persist.SaveTask(ctx, task); err != nil {
|
if err := sm.persist.SaveTaskStatus(ctx, task); err != nil {
|
||||||
return fmt.Errorf("saving task to database: %w", err)
|
return fmt.Errorf("saving task to database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +373,7 @@ func (m *StateMachineMocks) expectSaveTaskWithStatus(
|
|||||||
expectTaskStatus api.TaskStatus,
|
expectTaskStatus api.TaskStatus,
|
||||||
) *gomock.Call {
|
) *gomock.Call {
|
||||||
return m.persist.EXPECT().
|
return m.persist.EXPECT().
|
||||||
SaveTask(gomock.Any(), task).
|
SaveTaskStatus(gomock.Any(), task).
|
||||||
DoAndReturn(func(ctx context.Context, savedTask *persistence.Task) error {
|
DoAndReturn(func(ctx context.Context, savedTask *persistence.Task) error {
|
||||||
assert.Equal(t, expectTaskStatus, savedTask.Status)
|
assert.Equal(t, expectTaskStatus, savedTask.Status)
|
||||||
return nil
|
return nil
|
||||||
|
@ -32,8 +32,8 @@ func TestRequeueActiveTasksOfWorker(t *testing.T) {
|
|||||||
// Expect this re-queueing to end up in the task's log and activity.
|
// Expect this re-queueing to end up in the task's log and activity.
|
||||||
mocks.persist.EXPECT().SaveTaskActivity(ctx, task1) // TODO: test saved activity value
|
mocks.persist.EXPECT().SaveTaskActivity(ctx, task1) // TODO: test saved activity value
|
||||||
mocks.persist.EXPECT().SaveTaskActivity(ctx, task2) // TODO: test saved activity value
|
mocks.persist.EXPECT().SaveTaskActivity(ctx, task2) // TODO: test saved activity value
|
||||||
mocks.persist.EXPECT().SaveTask(ctx, task1) // TODO: test saved task status
|
mocks.persist.EXPECT().SaveTaskStatus(ctx, task1) // TODO: test saved task status
|
||||||
mocks.persist.EXPECT().SaveTask(ctx, task2) // TODO: test saved task status
|
mocks.persist.EXPECT().SaveTaskStatus(ctx, task2) // TODO: test saved task status
|
||||||
|
|
||||||
logMsg := "Task was requeued by Manager because worker had to test"
|
logMsg := "Task was requeued by Manager because worker had to test"
|
||||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg)
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), task1.Job.UUID, task1.UUID, logMsg)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user