From 3d854078ba932002fb57302f814205cf393dd641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 25 Feb 2022 16:30:27 +0100 Subject: [PATCH] Manager: integrate task state machine into API implementation --- cmd/flamenco-manager-poc/main.go | 4 +- internal/manager/api_impl/api_impl.go | 47 ++++++++----- internal/manager/api_impl/jobs.go | 24 ++++--- internal/manager/api_impl/jobs_test.go | 37 +++++++--- .../api_impl/mocks/api_impl_mock.gen.go | 67 ++++++++++++++++++- internal/manager/persistence/jobs.go | 7 ++ .../task_state_machine/task_state_machine.go | 1 + 7 files changed, 148 insertions(+), 39 deletions(-) diff --git a/cmd/flamenco-manager-poc/main.go b/cmd/flamenco-manager-poc/main.go index ef3f6858..a2f1b49f 100644 --- a/cmd/flamenco-manager-poc/main.go +++ b/cmd/flamenco-manager-poc/main.go @@ -43,6 +43,7 @@ import ( "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" "gitlab.com/blender/flamenco-ng-poc/internal/manager/swagger_ui" "gitlab.com/blender/flamenco-ng-poc/internal/manager/task_logs" + "gitlab.com/blender/flamenco-ng-poc/internal/manager/task_state_machine" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) @@ -91,7 +92,8 @@ func main() { log.Fatal().Err(err).Msg("error loading job compilers") } logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath) - flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService) + taskStateMachine := task_state_machine.NewStateMachine(persist) + flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService, taskStateMachine) e := buildWebService(flamenco, persist) // Start the web server. diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 7cb99777..79f3467a 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -29,18 +29,22 @@ import ( "github.com/rs/zerolog" "gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers" "gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence" + "gitlab.com/blender/flamenco-ng-poc/internal/manager/task_state_machine" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) type Flamenco struct { - jobCompiler JobCompiler - persist PersistenceService - logStorage LogStorage - config ConfigService + jobCompiler JobCompiler + persist PersistenceService + logStorage LogStorage + config ConfigService + stateMachine TaskStateMachine } +var _ api.ServerInterface = (*Flamenco)(nil) + // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -48,6 +52,7 @@ type PersistenceService interface { // FetchTask fetches the given task and the accompanying job. FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) SaveTask(ctx context.Context, task *persistence.Task) error + SaveTaskActivity(ctx context.Context, t *persistence.Task) error CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) @@ -59,12 +64,19 @@ type PersistenceService interface { ScheduleTask(w *persistence.Worker) (*persistence.Task, error) } -// TaskStateMachine interfaces task_state_machine.StateMachine. +var _ PersistenceService = (*persistence.DB)(nil) + type TaskStateMachine interface { - IsTaskStatusChangeValid(task *persistence.Task, newStatus api.TaskStatus) bool + // TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job. TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error + + // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. + JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error } +// TaskStateMachine should be a subset of task_state_machine.StateMachine. +var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) + type JobCompiler interface { ListJobTypes() api.AvailableJobTypes Compile(ctx context.Context, job api.SubmittedJob) (*job_compilers.AuthoredJob, error) @@ -80,15 +92,20 @@ type ConfigService interface { VariableReplacer } -var _ api.ServerInterface = (*Flamenco)(nil) - -// NewFlamenco creates a new Flamenco service, using the given JobCompiler. -func NewFlamenco(jc JobCompiler, jps PersistenceService, ls LogStorage, cs ConfigService) *Flamenco { +// NewFlamenco creates a new Flamenco service. +func NewFlamenco( + jc JobCompiler, + jps PersistenceService, + ls LogStorage, + cs ConfigService, + sm TaskStateMachine, +) *Flamenco { return &Flamenco{ - jobCompiler: jc, - persist: jps, - logStorage: ls, - config: cs, + jobCompiler: jc, + persist: jps, + logStorage: ls, + config: cs, + stateMachine: sm, } } diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 87e7ff15..be8ffe22 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -184,24 +184,26 @@ func (f *Flamenco) doTaskUpdate( logger.Panic().Msg("dbTask.Job is nil, unable to continue") } + var dbErr error + if update.TaskStatus != nil { - // TODO: check that this status transition is valid. - // TODO: process this status transition. - newStatus := *update.TaskStatus - logger.Info(). - Str("oldStatus", string(dbTask.Status)). - Str("newStatus", string(newStatus)). - Msg("task changing status") - dbTask.Status = newStatus + oldTaskStatus := dbTask.Status + err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus) + if err != nil { + logger.Error().Err(err). + Str("newTaskStatus", string(*update.TaskStatus)). + Str("oldTaskStatus", string(oldTaskStatus)). + Msg("error changing task status") + dbErr = fmt.Errorf("changing status of task %s to %q: %w", + dbTask.UUID, *update.TaskStatus, err) + } } if update.Activity != nil { dbTask.Activity = *update.Activity + dbErr = f.persist.SaveTaskActivity(ctx, dbTask) } - // Do the database persistence first, as that's more important than the logging. - dbErr := f.persist.SaveTask(ctx, dbTask) - if update.Log != nil { // Errors writing the log to file should be logged in our own logging // system, but shouldn't abort the render. As such, `err` is not returned to diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 3c82ef54..b8eb679e 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -30,6 +30,10 @@ import ( "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) +func ptr[T any](value T) *T { + return &value +} + func TestTaskUpdate(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -38,12 +42,10 @@ func TestTaskUpdate(t *testing.T) { worker := testWorker() // Construct the JSON request object. - s := func(value string) *string { return &value } - ts := func(value api.TaskStatus) *api.TaskStatus { return &value } taskUpdate := api.TaskUpdateJSONRequestBody{ - Activity: s("testing"), - Log: s("line1\nline2\n"), - TaskStatus: ts(api.TaskStatusFailed), + Activity: ptr("testing"), + Log: ptr("line1\nline2\n"), + TaskStatus: ptr(api.TaskStatusFailed), } // Construct the task that's supposed to be updated. @@ -55,18 +57,28 @@ func TestTaskUpdate(t *testing.T) { Worker: &worker, WorkerID: &worker.ID, Job: &mockJob, + Activity: "pre-update activity", } // Expect the task to be fetched. mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil) - // Expect the task to be saved. - var savedTask persistence.Task - mf.persistence.EXPECT().SaveTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, task *persistence.Task) error { - savedTask = *task + // Expect the task status change to be handed to the state machine. + var statusChangedtask persistence.Task + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed). + DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error { + statusChangedtask = *task return nil }) + + // Expect the activity to be updated. + var actUpdatedTask persistence.Task + mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, task *persistence.Task) error { + actUpdatedTask = *task + return nil + }) + // Expect the log to be written. mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n") @@ -76,5 +88,8 @@ func TestTaskUpdate(t *testing.T) { // Check the saved task. assert.NoError(t, err) - assert.Equal(t, mockTask.UUID, savedTask.UUID) + assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) + assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) + assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status. + assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately. } diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index d14b01e8..22da51f3 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService) +// Source: gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine) // Package mocks is a generated GoMock package. package mocks @@ -112,6 +112,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTask(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1) } +// SaveTaskActivity mocks base method. +func (m *MockPersistenceService) SaveTaskActivity(arg0 context.Context, arg1 *persistence.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveTaskActivity", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveTaskActivity indicates an expected call of SaveTaskActivity. +func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1) +} + // SaveWorker mocks base method. func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() @@ -306,3 +320,54 @@ func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interf mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2) } + +// MockTaskStateMachine is a mock of TaskStateMachine interface. +type MockTaskStateMachine struct { + ctrl *gomock.Controller + recorder *MockTaskStateMachineMockRecorder +} + +// MockTaskStateMachineMockRecorder is the mock recorder for MockTaskStateMachine. +type MockTaskStateMachineMockRecorder struct { + mock *MockTaskStateMachine +} + +// NewMockTaskStateMachine creates a new mock instance. +func NewMockTaskStateMachine(ctrl *gomock.Controller) *MockTaskStateMachine { + mock := &MockTaskStateMachine{ctrl: ctrl} + mock.recorder = &MockTaskStateMachineMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { + return m.recorder +} + +// JobStatusChange mocks base method. +func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// JobStatusChange indicates an expected call of JobStatusChange. +func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2) +} + +// TaskStatusChange mocks base method. +func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskStatusChange", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// TaskStatusChange indicates an expected call of TaskStatusChange. +func (mr *MockTaskStateMachineMockRecorder) TaskStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).TaskStatusChange), arg0, arg1, arg2) +} diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 7b079c27..e68489ac 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -220,6 +220,13 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error { return nil } +func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error { + if err := db.gormDB.Model(t).Updates(Task{Activity: t.Activity}).Error; err != nil { + return fmt.Errorf("error saving task activity: %w", err) + } + return nil +} + func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) { var numTasksInStatus int64 tx := db.gormDB.Model(&Task{}). diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 3e6c3288..d52df002 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -212,6 +212,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( } } +// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error { // Job status changes can trigger task status changes, which can trigger the // next job status change. Keep looping over these job status changes until