Manager: implement API endpoint for changing job priority

The priority of an existing can now be changed. It will be taken into
account when assigning tasks to workers, but it will not reassign tasks
that are already active.
This commit is contained in:
Sybren A. Stüvel 2022-09-30 16:29:11 +02:00
parent c6ede93fc1
commit 85d53de1f9
6 changed files with 148 additions and 0 deletions

View File

@ -7,6 +7,7 @@ bugs in actually-released versions.
## 3.1 - in development
- Web interface: make the worker IP address clickable; it will be copied to the clipboard when clicked.
- Add API operation to change the priority of an existing job.
## 3.0 - released 2022-09-12

View File

@ -31,6 +31,7 @@ type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
// FetchJob fetches a single job, without fetching its tasks.
FetchJob(ctx context.Context, jobID string) (*persistence.Job, error)
SaveJobPriority(ctx context.Context, job *persistence.Job) error
// FetchTask fetches the given task and the accompanying job.
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error)
@ -97,6 +98,7 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)
type ChangeBroadcaster interface {
// BroadcastNewJob sends a 'new job' notification to all SocketIO clients.
BroadcastNewJob(jobUpdate api.SocketIOJobUpdate)
BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate)
BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate)
// Note that there is no BroadcastNewTask. The 'new job' broadcast is sent

View File

@ -178,6 +178,53 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error {
return e.NoContent(http.StatusNoContent)
}
// SetJobPriority is used by the web interface to change a job's priority.
func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error {
logger := requestLogger(e)
ctx := e.Request().Context()
logger = logger.With().Str("job", jobID).Logger()
var prioChange api.SetJobPriorityJSONRequestBody
if err := e.Bind(&prioChange); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
dbJob, err := f.persist.FetchJob(ctx, jobID)
if err != nil {
if errors.Is(err, persistence.ErrJobNotFound) {
return sendAPIError(e, http.StatusNotFound, "no such job")
}
logger.Error().Err(err).Msg("error fetching job")
return sendAPIError(e, http.StatusInternalServerError, "error fetching job")
}
logger = logger.With().
Str("jobName", dbJob.Name).
Int("prioCurrent", dbJob.Priority).
Int("prioRequested", prioChange.Priority).
Logger()
logger.Info().Msg("job priority change requested")
// From here on, the request can be handled even when the client disconnects.
bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
dbJob.Priority = prioChange.Priority
err = f.persist.SaveJobPriority(bgCtx, dbJob)
if err != nil {
logger.Error().Err(err).Msg("error changing job priority")
return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing job priority")
}
// Broadcast this change to the SocketIO clients.
jobUpdate := webupdates.NewJobUpdate(dbJob)
f.broadcaster.BroadcastJobUpdate(jobUpdate)
return e.NoContent(http.StatusNoContent)
}
// SetTaskStatus is used by the web interface to change a task's status.
func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error {
logger := requestLogger(e)

View File

@ -353,6 +353,67 @@ func TestSetJobStatus_happy(t *testing.T) {
assertResponseNoContent(t, echoCtx)
}
func TestSetJobPrio_nonexistentJob(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobID := "18a9b096-d77e-438c-9be2-74397038298b"
prioUpdate := api.JobPriorityChange{Priority: 47}
mf.persistence.EXPECT().FetchJob(gomock.Any(), jobID).Return(nil, persistence.ErrJobNotFound)
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(prioUpdate)
err := mf.flamenco.SetJobStatus(echoCtx, jobID)
assert.NoError(t, err)
assertResponseAPIError(t, echoCtx, http.StatusNotFound, "no such job")
}
func TestSetJobPrio(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobID := "18a9b096-d77e-438c-9be2-74397038298b"
prioUpdate := api.JobPriorityChange{Priority: 47}
dbJob := persistence.Job{
UUID: jobID,
Name: "test job",
Priority: 50,
Settings: persistence.StringInterfaceMap{},
Metadata: persistence.StringStringMap{},
}
echoCtx := mf.prepareMockedJSONRequest(prioUpdate)
// Set up expectations.
ctx := echoCtx.Request().Context()
mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil).AnyTimes()
jobWithNewPrio := dbJob
jobWithNewPrio.Priority = 47
mf.persistence.EXPECT().SaveJobPriority(gomock.Not(ctx), &jobWithNewPrio)
// Expect the change to be broadcast over SocketIO.
expectUpdate := api.SocketIOJobUpdate{
Id: dbJob.UUID,
Name: &dbJob.Name,
RefreshTasks: false,
Priority: prioUpdate.Priority,
Status: dbJob.Status,
Updated: dbJob.UpdatedAt,
}
mf.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate)
err := mf.flamenco.SetJobPriority(echoCtx, jobID)
assert.NoError(t, err)
assertResponseNoContent(t, echoCtx)
}
func TestSetJobStatusFailedToRequeueing(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -319,6 +319,20 @@ func (mr *MockPersistenceServiceMockRecorder) RemoveFromJobBlocklist(arg0, arg1,
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveFromJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).RemoveFromJobBlocklist), arg0, arg1, arg2, arg3)
}
// SaveJobPriority mocks base method.
func (m *MockPersistenceService) SaveJobPriority(arg0 context.Context, arg1 *persistence.Job) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveJobPriority", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SaveJobPriority indicates an expected call of SaveJobPriority.
func (mr *MockPersistenceServiceMockRecorder) SaveJobPriority(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveJobPriority", reflect.TypeOf((*MockPersistenceService)(nil).SaveJobPriority), arg0, arg1)
}
// SaveTask mocks base method.
func (m *MockPersistenceService) SaveTask(arg0 context.Context, arg1 *persistence.Task) error {
m.ctrl.T.Helper()
@ -484,6 +498,18 @@ func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastJobUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastJobUpdate(arg0 api.SocketIOJobUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastJobUpdate", arg0)
}
// BroadcastJobUpdate indicates an expected call of BroadcastJobUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0)
}
// BroadcastLastRenderedImage mocks base method.
func (m *MockChangeBroadcaster) BroadcastLastRenderedImage(arg0 api.SocketIOLastRenderedUpdate) {
m.ctrl.T.Helper()

View File

@ -234,6 +234,17 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
return nil
}
// SaveJobPriority saves the job's Priority field.
func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{Priority: j.Priority})
if tx.Error != nil {
return jobError(tx.Error, "saving job priority")
}
return nil
}
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
dbTask := Task{}
tx := db.gormDB.WithContext(ctx).