Manager: integrate task state machine into API implementation

This commit is contained in:
Sybren A. Stüvel 2022-02-25 16:30:27 +01:00
parent 17e622ebc3
commit 3d854078ba
7 changed files with 148 additions and 39 deletions

View File

@ -43,6 +43,7 @@ import (
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/swagger_ui"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/task_logs"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/task_state_machine"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
@ -91,7 +92,8 @@ func main() {
log.Fatal().Err(err).Msg("error loading job compilers")
}
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath)
flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService)
taskStateMachine := task_state_machine.NewStateMachine(persist)
flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService, taskStateMachine)
e := buildWebService(flamenco, persist)
// Start the web server.

View File

@ -29,6 +29,7 @@ import (
"github.com/rs/zerolog"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/job_compilers"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/task_state_machine"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
@ -37,10 +38,13 @@ type Flamenco struct {
persist PersistenceService
logStorage LogStorage
config ConfigService
stateMachine TaskStateMachine
}
var _ api.ServerInterface = (*Flamenco)(nil)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine
type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -48,6 +52,7 @@ type PersistenceService interface {
// FetchTask fetches the given task and the accompanying job.
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
SaveTask(ctx context.Context, task *persistence.Task) error
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)
@ -59,12 +64,19 @@ type PersistenceService interface {
ScheduleTask(w *persistence.Worker) (*persistence.Task, error)
}
// TaskStateMachine interfaces task_state_machine.StateMachine.
var _ PersistenceService = (*persistence.DB)(nil)
type TaskStateMachine interface {
IsTaskStatusChangeValid(task *persistence.Task, newStatus api.TaskStatus) bool
// TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job.
TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error
// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error
}
// TaskStateMachine should be a subset of task_state_machine.StateMachine.
var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)
type JobCompiler interface {
ListJobTypes() api.AvailableJobTypes
Compile(ctx context.Context, job api.SubmittedJob) (*job_compilers.AuthoredJob, error)
@ -80,15 +92,20 @@ type ConfigService interface {
VariableReplacer
}
var _ api.ServerInterface = (*Flamenco)(nil)
// NewFlamenco creates a new Flamenco service, using the given JobCompiler.
func NewFlamenco(jc JobCompiler, jps PersistenceService, ls LogStorage, cs ConfigService) *Flamenco {
// NewFlamenco creates a new Flamenco service.
func NewFlamenco(
jc JobCompiler,
jps PersistenceService,
ls LogStorage,
cs ConfigService,
sm TaskStateMachine,
) *Flamenco {
return &Flamenco{
jobCompiler: jc,
persist: jps,
logStorage: ls,
config: cs,
stateMachine: sm,
}
}

View File

@ -184,24 +184,26 @@ func (f *Flamenco) doTaskUpdate(
logger.Panic().Msg("dbTask.Job is nil, unable to continue")
}
var dbErr error
if update.TaskStatus != nil {
// TODO: check that this status transition is valid.
// TODO: process this status transition.
newStatus := *update.TaskStatus
logger.Info().
Str("oldStatus", string(dbTask.Status)).
Str("newStatus", string(newStatus)).
Msg("task changing status")
dbTask.Status = newStatus
oldTaskStatus := dbTask.Status
err := f.stateMachine.TaskStatusChange(ctx, dbTask, *update.TaskStatus)
if err != nil {
logger.Error().Err(err).
Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status")
dbErr = fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err)
}
}
if update.Activity != nil {
dbTask.Activity = *update.Activity
dbErr = f.persist.SaveTaskActivity(ctx, dbTask)
}
// Do the database persistence first, as that's more important than the logging.
dbErr := f.persist.SaveTask(ctx, dbTask)
if update.Log != nil {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't abort the render. As such, `err` is not returned to

View File

@ -30,6 +30,10 @@ import (
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
func ptr[T any](value T) *T {
return &value
}
func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
@ -38,12 +42,10 @@ func TestTaskUpdate(t *testing.T) {
worker := testWorker()
// Construct the JSON request object.
s := func(value string) *string { return &value }
ts := func(value api.TaskStatus) *api.TaskStatus { return &value }
taskUpdate := api.TaskUpdateJSONRequestBody{
Activity: s("testing"),
Log: s("line1\nline2\n"),
TaskStatus: ts(api.TaskStatusFailed),
Activity: ptr("testing"),
Log: ptr("line1\nline2\n"),
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
@ -55,18 +57,28 @@ func TestTaskUpdate(t *testing.T) {
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
}
// Expect the task to be fetched.
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
// Expect the task to be saved.
var savedTask persistence.Task
mf.persistence.EXPECT().SaveTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, task *persistence.Task) error {
savedTask = *task
// Expect the task status change to be handed to the state machine.
var statusChangedtask persistence.Task
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), gomock.AssignableToTypeOf(&persistence.Task{}), api.TaskStatusFailed).
DoAndReturn(func(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error {
statusChangedtask = *task
return nil
})
// Expect the activity to be updated.
var actUpdatedTask persistence.Task
mf.persistence.EXPECT().SaveTaskActivity(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, task *persistence.Task) error {
actUpdatedTask = *task
return nil
})
// Expect the log to be written.
mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n")
@ -76,5 +88,8 @@ func TestTaskUpdate(t *testing.T) {
// Check the saved task.
assert.NoError(t, err)
assert.Equal(t, mockTask.UUID, savedTask.UUID)
assert.Equal(t, mockTask.UUID, statusChangedtask.UUID)
assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID)
assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status.
assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately.
}

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService)
// Source: gitlab.com/blender/flamenco-ng-poc/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine)
// Package mocks is a generated GoMock package.
package mocks
@ -112,6 +112,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTask(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTask", reflect.TypeOf((*MockPersistenceService)(nil).SaveTask), arg0, arg1)
}
// SaveTaskActivity mocks base method.
func (m *MockPersistenceService) SaveTaskActivity(arg0 context.Context, arg1 *persistence.Task) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveTaskActivity", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SaveTaskActivity indicates an expected call of SaveTaskActivity.
func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTaskActivity", reflect.TypeOf((*MockPersistenceService)(nil).SaveTaskActivity), arg0, arg1)
}
// SaveWorker mocks base method.
func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error {
m.ctrl.T.Helper()
@ -306,3 +320,54 @@ func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interf
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2)
}
// MockTaskStateMachine is a mock of TaskStateMachine interface.
type MockTaskStateMachine struct {
ctrl *gomock.Controller
recorder *MockTaskStateMachineMockRecorder
}
// MockTaskStateMachineMockRecorder is the mock recorder for MockTaskStateMachine.
type MockTaskStateMachineMockRecorder struct {
mock *MockTaskStateMachine
}
// NewMockTaskStateMachine creates a new mock instance.
func NewMockTaskStateMachine(ctrl *gomock.Controller) *MockTaskStateMachine {
mock := &MockTaskStateMachine{ctrl: ctrl}
mock.recorder = &MockTaskStateMachineMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder {
return m.recorder
}
// JobStatusChange mocks base method.
func (m *MockTaskStateMachine) JobStatusChange(arg0 context.Context, arg1 *persistence.Job, arg2 api.JobStatus) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "JobStatusChange", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// JobStatusChange indicates an expected call of JobStatusChange.
func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).JobStatusChange), arg0, arg1, arg2)
}
// TaskStatusChange mocks base method.
func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TaskStatusChange", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// TaskStatusChange indicates an expected call of TaskStatusChange.
func (mr *MockTaskStateMachineMockRecorder) TaskStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).TaskStatusChange), arg0, arg1, arg2)
}

View File

@ -220,6 +220,13 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error {
return nil
}
func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
if err := db.gormDB.Model(t).Updates(Task{Activity: t.Activity}).Error; err != nil {
return fmt.Errorf("error saving task activity: %w", err)
}
return nil
}
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
var numTasksInStatus int64
tx := db.gormDB.Model(&Task{}).

View File

@ -212,6 +212,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
}
}
// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error {
// Job status changes can trigger task status changes, which can trigger the
// next job status change. Keep looping over these job status changes until