diff --git a/CHANGELOG.md b/CHANGELOG.md index b23fed64..7a3b16ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ bugs in actually-released versions. ## 3.1 - in development - Web interface: make the worker IP address clickable; it will be copied to the clipboard when clicked. +- Add API operation to change the priority of an existing job. ## 3.0 - released 2022-09-12 diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 103b7c93..0b9661a3 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -31,6 +31,7 @@ type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error // FetchJob fetches a single job, without fetching its tasks. FetchJob(ctx context.Context, jobID string) (*persistence.Job, error) + SaveJobPriority(ctx context.Context, job *persistence.Job) error // FetchTask fetches the given task and the accompanying job. FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error) @@ -97,6 +98,7 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) type ChangeBroadcaster interface { // BroadcastNewJob sends a 'new job' notification to all SocketIO clients. BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) + BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) // Note that there is no BroadcastNewTask. The 'new job' broadcast is sent diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index b032f9c3..25ebadfe 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -178,6 +178,53 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { return e.NoContent(http.StatusNoContent) } +// SetJobPriority is used by the web interface to change a job's priority. +func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error { + logger := requestLogger(e) + ctx := e.Request().Context() + + logger = logger.With().Str("job", jobID).Logger() + + var prioChange api.SetJobPriorityJSONRequestBody + if err := e.Bind(&prioChange); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + dbJob, err := f.persist.FetchJob(ctx, jobID) + if err != nil { + if errors.Is(err, persistence.ErrJobNotFound) { + return sendAPIError(e, http.StatusNotFound, "no such job") + } + logger.Error().Err(err).Msg("error fetching job") + return sendAPIError(e, http.StatusInternalServerError, "error fetching job") + } + + logger = logger.With(). + Str("jobName", dbJob.Name). + Int("prioCurrent", dbJob.Priority). + Int("prioRequested", prioChange.Priority). + Logger() + logger.Info().Msg("job priority change requested") + + // From here on, the request can be handled even when the client disconnects. + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + dbJob.Priority = prioChange.Priority + err = f.persist.SaveJobPriority(bgCtx, dbJob) + if err != nil { + logger.Error().Err(err).Msg("error changing job priority") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing job priority") + } + + // Broadcast this change to the SocketIO clients. + jobUpdate := webupdates.NewJobUpdate(dbJob) + f.broadcaster.BroadcastJobUpdate(jobUpdate) + + return e.NoContent(http.StatusNoContent) +} + // SetTaskStatus is used by the web interface to change a task's status. func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error { logger := requestLogger(e) diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 140d4716..eef218af 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -353,6 +353,67 @@ func TestSetJobStatus_happy(t *testing.T) { assertResponseNoContent(t, echoCtx) } +func TestSetJobPrio_nonexistentJob(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + prioUpdate := api.JobPriorityChange{Priority: 47} + + mf.persistence.EXPECT().FetchJob(gomock.Any(), jobID).Return(nil, persistence.ErrJobNotFound) + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(prioUpdate) + err := mf.flamenco.SetJobStatus(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseAPIError(t, echoCtx, http.StatusNotFound, "no such job") +} + +func TestSetJobPrio(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + prioUpdate := api.JobPriorityChange{Priority: 47} + dbJob := persistence.Job{ + UUID: jobID, + Name: "test job", + Priority: 50, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + + echoCtx := mf.prepareMockedJSONRequest(prioUpdate) + + // Set up expectations. + ctx := echoCtx.Request().Context() + mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil).AnyTimes() + jobWithNewPrio := dbJob + jobWithNewPrio.Priority = 47 + mf.persistence.EXPECT().SaveJobPriority(gomock.Not(ctx), &jobWithNewPrio) + + // Expect the change to be broadcast over SocketIO. + expectUpdate := api.SocketIOJobUpdate{ + Id: dbJob.UUID, + Name: &dbJob.Name, + RefreshTasks: false, + Priority: prioUpdate.Priority, + Status: dbJob.Status, + Updated: dbJob.UpdatedAt, + } + mf.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate) + + err := mf.flamenco.SetJobPriority(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseNoContent(t, echoCtx) +} + func TestSetJobStatusFailedToRequeueing(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 3a5b5858..b98f9bd5 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -319,6 +319,20 @@ func (mr *MockPersistenceServiceMockRecorder) RemoveFromJobBlocklist(arg0, arg1, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveFromJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).RemoveFromJobBlocklist), arg0, arg1, arg2, arg3) } +// SaveJobPriority mocks base method. +func (m *MockPersistenceService) SaveJobPriority(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveJobPriority", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveJobPriority indicates an expected call of SaveJobPriority. +func (mr *MockPersistenceServiceMockRecorder) SaveJobPriority(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveJobPriority", reflect.TypeOf((*MockPersistenceService)(nil).SaveJobPriority), arg0, arg1) +} + // SaveTask mocks base method. func (m *MockPersistenceService) SaveTask(arg0 context.Context, arg1 *persistence.Task) error { m.ctrl.T.Helper() @@ -484,6 +498,18 @@ func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { return m.recorder } +// BroadcastJobUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastJobUpdate(arg0 api.SocketIOJobUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastJobUpdate", arg0) +} + +// BroadcastJobUpdate indicates an expected call of BroadcastJobUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0) +} + // BroadcastLastRenderedImage mocks base method. func (m *MockChangeBroadcaster) BroadcastLastRenderedImage(arg0 api.SocketIOLastRenderedUpdate) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 43a0c640..e40bdf76 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -234,6 +234,17 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { return nil } +// SaveJobPriority saves the job's Priority field. +func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { + tx := db.gormDB.WithContext(ctx). + Model(j). + Updates(Job{Priority: j.Priority}) + if tx.Error != nil { + return jobError(tx.Error, "saving job priority") + } + return nil +} + func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { dbTask := Task{} tx := db.gormDB.WithContext(ctx).