From 61cc8ff04d9156dafac9040723bea317f7a81a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 29 Feb 2024 20:40:14 +0100 Subject: [PATCH] Manager: implement API operation to get the farm status Add a new API operation to get the overall farm status. This is based on the jobs and workers, and their status. The statuses are: - `active`: Actively working on jobs. - `idle`: Farm could be active, but has no work to do. - `waiting`: Work has been queued, but all workers are asleep. - `asleep`: Farm is idle, and all workers are asleep. - `inoperative`: Cannot work: no workers, or all are offline/error. - `starting`: Farm is starting up. - `unknown`: Unexpected configuration of worker and job statuses. --- cmd/flamenco-manager/main.go | 12 +- internal/manager/api_impl/api_impl.go | 3 + internal/manager/api_impl/interfaces.go | 9 +- internal/manager/api_impl/meta.go | 4 + .../api_impl/mocks/api_impl_mock.gen.go | 39 +++- internal/manager/api_impl/support_test.go | 5 +- internal/manager/farmstatus/farmstatus.go | 169 ++++++++++++++ .../manager/farmstatus/farmstatus_test.go | 213 ++++++++++++++++++ internal/manager/farmstatus/interfaces.go | 17 ++ .../farmstatus/mocks/interfaces_mock.gen.go | 66 ++++++ internal/manager/persistence/jobs_query.go | 30 +++ .../manager/persistence/jobs_query_test.go | 58 +++++ internal/manager/persistence/workers.go | 31 +++ internal/manager/persistence/workers_test.go | 63 ++++++ 14 files changed, 715 insertions(+), 4 deletions(-) create mode 100644 internal/manager/farmstatus/farmstatus.go create mode 100644 internal/manager/farmstatus/farmstatus_test.go create mode 100644 internal/manager/farmstatus/interfaces.go create mode 100644 internal/manager/farmstatus/mocks/interfaces_mock.gen.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 8da0198f..c7dd60a5 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -27,6 +27,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/api_impl/dummy" "projects.blender.org/studio/flamenco/internal/manager/config" "projects.blender.org/studio/flamenco/internal/manager/eventbus" + "projects.blender.org/studio/flamenco/internal/manager/farmstatus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" @@ -174,10 +175,12 @@ func runFlamencoManager() bool { shamanServer := buildShamanServer(configService, isFirstRun) jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer) + farmStatus := farmstatus.NewService(persist) + flamenco := api_impl.NewFlamenco( compiler, persist, eventBroker, logStorage, configService, taskStateMachine, shamanServer, timeService, lastRender, - localStorage, sleepScheduler, jobDeleter) + localStorage, sleepScheduler, jobDeleter, farmStatus) e := buildWebService(flamenco, persist, ssdp, socketio, urls, localStorage) @@ -278,6 +281,13 @@ func runFlamencoManager() bool { jobDeleter.Run(mainCtx) }() + // Run the Farm Status service. + wg.Add(1) + go func() { + defer wg.Done() + farmStatus.Run(mainCtx) + }() + // Log the URLs last, hopefully that makes them more visible / encouraging to go to for users. go func() { time.Sleep(100 * time.Millisecond) diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 6e109481..b923d3f8 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -28,6 +28,7 @@ type Flamenco struct { localStorage LocalStorage sleepScheduler WorkerSleepScheduler jobDeleter JobDeleter + farmstatus FarmStatusService // The task scheduler can be locked to prevent multiple Workers from getting // the same task. It is also used for certain other queries, like @@ -55,6 +56,7 @@ func NewFlamenco( localStorage LocalStorage, wss WorkerSleepScheduler, jd JobDeleter, + farmstatus FarmStatusService, ) *Flamenco { return &Flamenco{ jobCompiler: jc, @@ -69,6 +71,7 @@ func NewFlamenco( localStorage: localStorage, sleepScheduler: wss, jobDeleter: jd, + farmstatus: farmstatus, done: make(chan struct{}), } diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index fe549259..d965519f 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -15,6 +15,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/config" "projects.blender.org/studio/flamenco/internal/manager/eventbus" + "projects.blender.org/studio/flamenco/internal/manager/farmstatus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" @@ -26,7 +27,7 @@ import ( ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks projects.blender.org/studio/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks projects.blender.org/studio/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter,FarmStatusService type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -239,3 +240,9 @@ type JobDeleter interface { } var _ JobDeleter = (*job_deleter.Service)(nil) + +type FarmStatusService interface { + Report() api.FarmStatusReport +} + +var _ FarmStatusService = (*farmstatus.Service)(nil) diff --git a/internal/manager/api_impl/meta.go b/internal/manager/api_impl/meta.go index c647270b..000bd5de 100644 --- a/internal/manager/api_impl/meta.go +++ b/internal/manager/api_impl/meta.go @@ -321,6 +321,10 @@ func (f *Flamenco) SaveSetupAssistantConfig(e echo.Context) error { return e.NoContent(http.StatusNoContent) } +func (f *Flamenco) GetFarmStatus(e echo.Context) error { + return e.JSON(http.StatusOK, f.farmstatus.Report()) +} + func flamencoManagerDir() (string, error) { exename, err := os.Executable() if err != nil { diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index ccc776b6..a7360e98 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: projects.blender.org/studio/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter) +// Source: projects.blender.org/studio/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter,FarmStatusService) // Package mocks is a generated GoMock package. package mocks @@ -1413,3 +1413,40 @@ func (mr *MockJobDeleterMockRecorder) WhatWouldBeDeleted(arg0 interface{}) *gomo mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WhatWouldBeDeleted", reflect.TypeOf((*MockJobDeleter)(nil).WhatWouldBeDeleted), arg0) } + +// MockFarmStatusService is a mock of FarmStatusService interface. +type MockFarmStatusService struct { + ctrl *gomock.Controller + recorder *MockFarmStatusServiceMockRecorder +} + +// MockFarmStatusServiceMockRecorder is the mock recorder for MockFarmStatusService. +type MockFarmStatusServiceMockRecorder struct { + mock *MockFarmStatusService +} + +// NewMockFarmStatusService creates a new mock instance. +func NewMockFarmStatusService(ctrl *gomock.Controller) *MockFarmStatusService { + mock := &MockFarmStatusService{ctrl: ctrl} + mock.recorder = &MockFarmStatusServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFarmStatusService) EXPECT() *MockFarmStatusServiceMockRecorder { + return m.recorder +} + +// Report mocks base method. +func (m *MockFarmStatusService) Report() api.FarmStatusReport { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Report") + ret0, _ := ret[0].(api.FarmStatusReport) + return ret0 +} + +// Report indicates an expected call of Report. +func (mr *MockFarmStatusServiceMockRecorder) Report() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockFarmStatusService)(nil).Report)) +} diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index 4a7ea588..33e712fd 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -37,6 +37,7 @@ type mockedFlamenco struct { localStorage *mocks.MockLocalStorage sleepScheduler *mocks.MockWorkerSleepScheduler jobDeleter *mocks.MockJobDeleter + farmstatus *mocks.MockFarmStatusService // Place for some tests to store a temporary directory. tempdir string @@ -54,6 +55,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { localStore := mocks.NewMockLocalStorage(mockCtrl) wss := mocks.NewMockWorkerSleepScheduler(mockCtrl) jd := mocks.NewMockJobDeleter(mockCtrl) + fs := mocks.NewMockFarmStatusService(mockCtrl) clock := clock.NewMock() mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") @@ -62,7 +64,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { } clock.Set(mockedNow) - f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss, jd) + f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss, jd, fs) return mockedFlamenco{ flamenco: f, @@ -78,6 +80,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { localStorage: localStore, sleepScheduler: wss, jobDeleter: jd, + farmstatus: fs, } } diff --git a/internal/manager/farmstatus/farmstatus.go b/internal/manager/farmstatus/farmstatus.go new file mode 100644 index 00000000..69f673fb --- /dev/null +++ b/internal/manager/farmstatus/farmstatus.go @@ -0,0 +1,169 @@ +// package farmstatus provides a status indicator for the entire Flamenco farm. +package farmstatus + +import ( + "context" + "errors" + "slices" + "sync" + "time" + + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/pkg/api" + "projects.blender.org/studio/flamenco/pkg/website" +) + +const ( + // pollWait determines how often the persistence layer is queried to get the + // counts & statuses of workers and jobs. + // + // Note that this indicates the time between polls, so between a poll + // operation being done, and the next one starting. + pollWait = 5 * time.Second +) + +// Service keeps track of the overall farm status. +type Service struct { + persist PersistenceService + + mutex sync.Mutex + lastReport api.FarmStatusReport +} + +func NewService(persist PersistenceService) *Service { + return &Service{ + persist: persist, + mutex: sync.Mutex{}, + lastReport: api.FarmStatusReport{ + Status: api.FarmStatusStarting, + }, + } +} + +// Run the farm status polling loop. +func (s *Service) Run(ctx context.Context) { + log.Debug().Msg("farm status: polling service running") + defer log.Debug().Msg("farm status: polling service stopped") + + for { + select { + case <-ctx.Done(): + return + case <-time.After(pollWait): + s.poll(ctx) + } + } +} + +// Report returns the last-known farm status report. +// +// It is updated every few seconds, from the Run() function. +func (s *Service) Report() api.FarmStatusReport { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.lastReport +} + +func (s *Service) poll(ctx context.Context) { + report := s.checkFarmStatus(ctx) + if report == nil { + // Already logged, just keep the last known log around for querying. + return + } + + s.mutex.Lock() + s.lastReport = *report + s.mutex.Unlock() +} + +// checkFarmStatus checks the farm status by querying the peristence layer. +// This function does not return an error, but instead logs them as warnings and returns nil. +func (s *Service) checkFarmStatus(ctx context.Context) *api.FarmStatusReport { + log.Trace().Msg("farm status: checking the farm status") + startTime := time.Now() + + defer func() { + duration := time.Since(startTime) + log.Debug().Stringer("duration", duration).Msg("farm status: checked the farm status") + }() + + workerStatuses, err := s.persist.SummarizeWorkerStatuses(ctx) + if err != nil { + logDBError(err, "farm status: could not summarize worker statuses") + return nil + } + + // Check some worker statuses first. When there are no workers and the farm is + // inoperative, there is little use in checking jobs. At least for now. Maybe + // later we want to have some info in the reported status that indicates a + // more pressing matter (as in, inoperative AND a job is queued). + + // Check: inoperative + if len(workerStatuses) == 0 || allIn(workerStatuses, api.WorkerStatusOffline, api.WorkerStatusError) { + return &api.FarmStatusReport{ + Status: api.FarmStatusInoperative, + } + } + + jobStatuses, err := s.persist.SummarizeJobStatuses(ctx) + if err != nil { + logDBError(err, "farm status: could not summarize job statuses") + return nil + } + + anyJobActive := jobStatuses[api.JobStatusActive] > 0 + anyJobQueued := jobStatuses[api.JobStatusQueued] > 0 + isWorkAvailable := anyJobActive || anyJobQueued + + anyWorkerAwake := workerStatuses[api.WorkerStatusAwake] > 0 + anyWorkerAsleep := workerStatuses[api.WorkerStatusAsleep] > 0 + allWorkersAsleep := !anyWorkerAwake && anyWorkerAsleep + + report := api.FarmStatusReport{} + switch { + case anyJobActive && anyWorkerAwake: + // - "active" # Actively working on jobs. + report.Status = api.FarmStatusActive + case isWorkAvailable: + // - "waiting" # Work to be done, but there is no worker awake. + report.Status = api.FarmStatusWaiting + case !isWorkAvailable && allWorkersAsleep: + // - "asleep" # Farm is idle, and all workers are asleep. + report.Status = api.FarmStatusAsleep + case !isWorkAvailable: + // - "idle" # Farm could be active, but has no work to do. + report.Status = api.FarmStatusIdle + default: + log.Warn(). + Interface("workerStatuses", workerStatuses). + Interface("jobStatuses", jobStatuses). + Msgf("farm status: unexpected configuration of worker and job statuses, please report this at %s", website.BugReportURL) + report.Status = api.FarmStatusUnknown + } + + return &report +} + +func logDBError(err error, message string) { + switch { + case errors.Is(err, context.DeadlineExceeded): + log.Warn().Msg(message + " (it took too long)") + case errors.Is(err, context.Canceled): + log.Debug().Msg(message + " (Flamenco is shutting down)") + default: + log.Warn().AnErr("cause", err).Msg(message) + } +} + +func allIn[T comparable](statuses map[T]int, shouldBeIn ...T) bool { + for status, count := range statuses { + if count == 0 { + continue + } + + if !slices.Contains(shouldBeIn, status) { + return false + } + } + return true +} diff --git a/internal/manager/farmstatus/farmstatus_test.go b/internal/manager/farmstatus/farmstatus_test.go new file mode 100644 index 00000000..b7d7532c --- /dev/null +++ b/internal/manager/farmstatus/farmstatus_test.go @@ -0,0 +1,213 @@ +// package farmstatus provides a status indicator for the entire Flamenco farm. +package farmstatus + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "projects.blender.org/studio/flamenco/internal/manager/farmstatus/mocks" + "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/pkg/api" +) + +type Fixtures struct { + service *Service + persist *mocks.MockPersistenceService + ctx context.Context +} + +func TestFarmStatusStarting(t *testing.T) { + f := fixtures(t) + report := f.service.Report() + assert.Equal(t, api.FarmStatusStarting, report.Status) +} + +func TestFarmStatusLoop(t *testing.T) { + f := fixtures(t) + + // Mock an "active" status. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 2, + api.WorkerStatusAsleep: 1, + api.WorkerStatusError: 1, + api.WorkerStatusAwake: 3, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusActive: 1, + }) + + // Before polling, the status should still be 'starting'. + report := f.service.Report() + assert.Equal(t, api.FarmStatusStarting, report.Status) + + // After a single poll, the report should have been updated. + f.service.poll(f.ctx) + report = f.service.Report() + assert.Equal(t, api.FarmStatusActive, report.Status) +} + +func TestCheckFarmStatusInoperative(t *testing.T) { + f := fixtures(t) + + // "inoperative": no workers. + f.mockWorkerStatuses(persistence.WorkerStatusCount{}) + report := f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusInoperative, report.Status) + + // "inoperative": all workers offline. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 3, + }) + report = f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusInoperative, report.Status) + + // "inoperative": some workers offline, some in error, + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 2, + api.WorkerStatusError: 1, + }) + report = f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusInoperative, report.Status) +} + +func TestCheckFarmStatusActive(t *testing.T) { + f := fixtures(t) + + // "active" # Actively working on jobs. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 2, + api.WorkerStatusAsleep: 1, + api.WorkerStatusError: 1, + api.WorkerStatusAwake: 3, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusActive: 1, + }) + report := f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusActive, report.Status) +} + +func TestCheckFarmStatusWaiting(t *testing.T) { + f := fixtures(t) + + // "waiting": Active job, and only sleeping workers. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusAsleep: 1, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusActive: 1, + }) + report := f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusWaiting, report.Status) + + // "waiting": Queued job, and awake worker. It could pick up the job any + // second now, but it could also have been blocklisted already. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusAsleep: 1, + api.WorkerStatusAwake: 1, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusQueued: 1, + }) + report = f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusWaiting, report.Status) +} + +func TestCheckFarmStatusIdle(t *testing.T) { + f := fixtures(t) + + // "idle" # Farm could be active, but has no work to do. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 2, + api.WorkerStatusAsleep: 1, + api.WorkerStatusAwake: 1, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusCompleted: 1, + api.JobStatusCancelRequested: 1, + }) + report := f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusIdle, report.Status) +} + +func TestCheckFarmStatusAsleep(t *testing.T) { + f := fixtures(t) + + // "asleep": No worker is awake, some are asleep, no work to do. + f.mockWorkerStatuses(persistence.WorkerStatusCount{ + api.WorkerStatusOffline: 2, + api.WorkerStatusAsleep: 2, + }) + f.mockJobStatuses(persistence.JobStatusCount{ + api.JobStatusCanceled: 10, + api.JobStatusCompleted: 4, + api.JobStatusFailed: 2, + }) + report := f.service.checkFarmStatus(f.ctx) + require.NotNil(t, report) + assert.Equal(t, api.FarmStatusAsleep, report.Status) +} + +func Test_allIn(t *testing.T) { + type args struct { + statuses map[api.WorkerStatus]int + shouldBeIn []api.WorkerStatus + } + tests := []struct { + name string + args args + want bool + }{ + {"none", args{map[api.WorkerStatus]int{}, []api.WorkerStatus{api.WorkerStatusAsleep}}, true}, + {"match-only", args{ + map[api.WorkerStatus]int{api.WorkerStatusAsleep: 5}, + []api.WorkerStatus{api.WorkerStatusAsleep}, + }, true}, + {"match-some", args{ + map[api.WorkerStatus]int{api.WorkerStatusAsleep: 5, api.WorkerStatusOffline: 2}, + []api.WorkerStatus{api.WorkerStatusAsleep}, + }, false}, + {"match-all", args{ + map[api.WorkerStatus]int{api.WorkerStatusAsleep: 5, api.WorkerStatusOffline: 2}, + []api.WorkerStatus{api.WorkerStatusAsleep, api.WorkerStatusOffline}, + }, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := allIn(tt.args.statuses, tt.args.shouldBeIn...); got != tt.want { + t.Errorf("allIn() = %v, want %v", got, tt.want) + } + }) + } +} + +func fixtures(t *testing.T) *Fixtures { + mockCtrl := gomock.NewController(t) + + f := Fixtures{ + persist: mocks.NewMockPersistenceService(mockCtrl), + ctx: context.Background(), + } + + f.service = NewService(f.persist) + + return &f +} + +func (f *Fixtures) mockWorkerStatuses(workerStatuses persistence.WorkerStatusCount) { + f.persist.EXPECT().SummarizeWorkerStatuses(f.ctx).Return(workerStatuses, nil) +} + +func (f *Fixtures) mockJobStatuses(jobStatuses persistence.JobStatusCount) { + f.persist.EXPECT().SummarizeJobStatuses(f.ctx).Return(jobStatuses, nil) +} diff --git a/internal/manager/farmstatus/interfaces.go b/internal/manager/farmstatus/interfaces.go new file mode 100644 index 00000000..a5687f56 --- /dev/null +++ b/internal/manager/farmstatus/interfaces.go @@ -0,0 +1,17 @@ +package farmstatus + +import ( + "context" + + "projects.blender.org/studio/flamenco/internal/manager/persistence" +) + +// Generate mock implementations of these interfaces. +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks projects.blender.org/studio/flamenco/internal/manager/farmstatus PersistenceService + +type PersistenceService interface { + SummarizeJobStatuses(ctx context.Context) (persistence.JobStatusCount, error) + SummarizeWorkerStatuses(ctx context.Context) (persistence.WorkerStatusCount, error) +} + +var _ PersistenceService = (*persistence.DB)(nil) diff --git a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go new file mode 100644 index 00000000..b252b4ea --- /dev/null +++ b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go @@ -0,0 +1,66 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: projects.blender.org/studio/flamenco/internal/manager/farmstatus (interfaces: PersistenceService) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" +) + +// MockPersistenceService is a mock of PersistenceService interface. +type MockPersistenceService struct { + ctrl *gomock.Controller + recorder *MockPersistenceServiceMockRecorder +} + +// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService. +type MockPersistenceServiceMockRecorder struct { + mock *MockPersistenceService +} + +// NewMockPersistenceService creates a new mock instance. +func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService { + mock := &MockPersistenceService{ctrl: ctrl} + mock.recorder = &MockPersistenceServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { + return m.recorder +} + +// SummarizeJobStatuses mocks base method. +func (m *MockPersistenceService) SummarizeJobStatuses(arg0 context.Context) (persistence.JobStatusCount, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SummarizeJobStatuses", arg0) + ret0, _ := ret[0].(persistence.JobStatusCount) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SummarizeJobStatuses indicates an expected call of SummarizeJobStatuses. +func (mr *MockPersistenceServiceMockRecorder) SummarizeJobStatuses(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SummarizeJobStatuses", reflect.TypeOf((*MockPersistenceService)(nil).SummarizeJobStatuses), arg0) +} + +// SummarizeWorkerStatuses mocks base method. +func (m *MockPersistenceService) SummarizeWorkerStatuses(arg0 context.Context) (persistence.WorkerStatusCount, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SummarizeWorkerStatuses", arg0) + ret0, _ := ret[0].(persistence.WorkerStatusCount) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SummarizeWorkerStatuses indicates an expected call of SummarizeWorkerStatuses. +func (mr *MockPersistenceServiceMockRecorder) SummarizeWorkerStatuses(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SummarizeWorkerStatuses", reflect.TypeOf((*MockPersistenceService)(nil).SummarizeWorkerStatuses), arg0) +} diff --git a/internal/manager/persistence/jobs_query.go b/internal/manager/persistence/jobs_query.go index c4431b05..fe040aa6 100644 --- a/internal/manager/persistence/jobs_query.go +++ b/internal/manager/persistence/jobs_query.go @@ -86,3 +86,33 @@ func (db *DB) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*Tas return result, tx.Error } + +// JobStatusCount is a mapping from job status to the number of jobs in that status. +type JobStatusCount map[api.JobStatus]int + +func (db *DB) SummarizeJobStatuses(ctx context.Context) (JobStatusCount, error) { + logger := log.Ctx(ctx) + logger.Debug().Msg("database: summarizing job statuses") + + // Query the database using a data structure that's easy to handle in GORM. + type queryResult struct { + Status api.JobStatus + StatusCount int + } + result := []*queryResult{} + tx := db.gormDB.WithContext(ctx).Model(&Job{}). + Select("status as Status", "count(id) as StatusCount"). + Group("status"). + Scan(&result) + if tx.Error != nil { + return nil, jobError(tx.Error, "summarizing job statuses") + } + + // Convert the array-of-structs to a map that's easier to handle by the caller. + statusCounts := make(JobStatusCount) + for _, singleStatusCount := range result { + statusCounts[singleStatusCount.Status] = singleStatusCount.StatusCount + } + + return statusCounts, nil +} diff --git a/internal/manager/persistence/jobs_query_test.go b/internal/manager/persistence/jobs_query_test.go index 6229b799..ad0848fb 100644 --- a/internal/manager/persistence/jobs_query_test.go +++ b/internal/manager/persistence/jobs_query_test.go @@ -4,9 +4,12 @@ package persistence // SPDX-License-Identifier: GPL-3.0-or-later import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/uuid" @@ -141,3 +144,58 @@ func TestQueryJobTaskSummaries(t *testing.T) { assert.True(t, expectTaskUUIDs[summary.UUID], "%q should be in %v", summary.UUID, expectTaskUUIDs) } } + +func TestSummarizeJobStatuses(t *testing.T) { + ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) + defer close() + + // Create another job + authoredJob2 := duplicateJobAndTasks(authoredJob1) + job2 := persistAuthoredJob(t, ctx, db, authoredJob2) + + // Test the summary. + summary, err := db.SummarizeJobStatuses(ctx) + require.NoError(t, err) + assert.Equal(t, JobStatusCount{api.JobStatusUnderConstruction: 2}, summary) + + // Change the jobs so that each has a unique status. + job1.Status = api.JobStatusQueued + require.NoError(t, db.SaveJobStatus(ctx, job1)) + job2.Status = api.JobStatusFailed + require.NoError(t, db.SaveJobStatus(ctx, job2)) + + // Test the summary. + summary, err = db.SummarizeJobStatuses(ctx) + require.NoError(t, err) + assert.Equal(t, JobStatusCount{ + api.JobStatusQueued: 1, + api.JobStatusFailed: 1, + }, summary) + + // Delete all jobs. + require.NoError(t, db.DeleteJob(ctx, job1.UUID)) + require.NoError(t, db.DeleteJob(ctx, job2.UUID)) + + // Test the summary. + summary, err = db.SummarizeJobStatuses(ctx) + require.NoError(t, err) + assert.Equal(t, JobStatusCount{}, summary) +} + +// Check that a context timeout can be detected by inspecting the +// returned error. +func TestSummarizeJobStatusesTimeout(t *testing.T) { + ctx, close, db, _, _ := jobTasksTestFixtures(t) + defer close() + + subCtx, subCtxCancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer subCtxCancel() + + // Force a timeout of the context. And yes, even when a nanosecond is quite + // short, it is still necessary to wait. + time.Sleep(2 * time.Nanosecond) + + summary, err := db.SummarizeJobStatuses(subCtx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Nil(t, summary) +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index a8e73d4e..60ec853b 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/rs/zerolog/log" "gorm.io/gorm" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -176,3 +177,33 @@ func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error { } return nil } + +// WorkerStatusCount is a mapping from job status to the number of jobs in that status. +type WorkerStatusCount map[api.WorkerStatus]int + +func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, error) { + logger := log.Ctx(ctx) + logger.Debug().Msg("database: summarizing worker statuses") + + // Query the database using a data structure that's easy to handle in GORM. + type queryResult struct { + Status api.WorkerStatus + StatusCount int + } + result := []*queryResult{} + tx := db.gormDB.WithContext(ctx).Model(&Worker{}). + Select("status as Status", "count(id) as StatusCount"). + Group("status"). + Scan(&result) + if tx.Error != nil { + return nil, workerError(tx.Error, "summarizing worker statuses") + } + + // Convert the array-of-structs to a map that's easier to handle by the caller. + statusCounts := make(WorkerStatusCount) + for _, singleStatusCount := range result { + statusCounts[singleStatusCount.Status] = singleStatusCount.StatusCount + } + + return statusCounts, nil +} diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index b58db75d..e1f62f1b 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -4,6 +4,7 @@ package persistence // SPDX-License-Identifier: GPL-3.0-or-later import ( + "context" "testing" "time" @@ -334,3 +335,65 @@ func TestDeleteWorkerWithTagAssigned(t *testing.T) { require.NoError(t, err) assert.Empty(t, tag.Workers) } + +func TestSummarizeWorkerStatuses(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Test the summary. + summary, err := f.db.SummarizeWorkerStatuses(f.ctx) + require.NoError(t, err) + assert.Equal(t, WorkerStatusCount{api.WorkerStatusAwake: 1}, summary) + + // Create more workers. + w1 := Worker{ + UUID: "fd97a35b-a5bd-44b4-ac2b-64c193ca877d", + Name: "Worker 1", + Status: api.WorkerStatusAwake, + } + w2 := Worker{ + UUID: "82b2d176-cb8c-4bfa-8300-41c216d766df", + Name: "Worker 2", + Status: api.WorkerStatusOffline, + } + + require.NoError(t, f.db.CreateWorker(f.ctx, &w1)) + require.NoError(t, f.db.CreateWorker(f.ctx, &w2)) + + // Test the summary. + summary, err = f.db.SummarizeWorkerStatuses(f.ctx) + require.NoError(t, err) + assert.Equal(t, WorkerStatusCount{ + api.WorkerStatusAwake: 2, + api.WorkerStatusOffline: 1, + }, summary) + + // Delete all workers. + require.NoError(t, f.db.DeleteWorker(f.ctx, f.worker.UUID)) + require.NoError(t, f.db.DeleteWorker(f.ctx, w1.UUID)) + require.NoError(t, f.db.DeleteWorker(f.ctx, w2.UUID)) + + // Test the summary. + summary, err = f.db.SummarizeWorkerStatuses(f.ctx) + require.NoError(t, err) + assert.Equal(t, WorkerStatusCount{}, summary) +} + +// Check that a context timeout can be detected by inspecting the +// returned error. +func TestSummarizeWorkerStatusesTimeout(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + subCtx, subCtxCancel := context.WithTimeout(f.ctx, 1*time.Nanosecond) + defer subCtxCancel() + + // Force a timeout of the context. And yes, even when a nanosecond is quite + // short, it is still necessary to wait. + time.Sleep(2 * time.Nanosecond) + + // Test the summary. + summary, err := f.db.SummarizeWorkerStatuses(subCtx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Nil(t, summary) +}