diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index c7dd60a5..42373bbe 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -175,7 +175,7 @@ func runFlamencoManager() bool { shamanServer := buildShamanServer(configService, isFirstRun) jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer) - farmStatus := farmstatus.NewService(persist) + farmStatus := farmstatus.NewService(persist, eventBroker) flamenco := api_impl.NewFlamenco( compiler, persist, eventBroker, logStorage, configService, diff --git a/internal/manager/eventbus/events_farmstatus.go b/internal/manager/eventbus/events_farmstatus.go new file mode 100644 index 00000000..6a03d39a --- /dev/null +++ b/internal/manager/eventbus/events_farmstatus.go @@ -0,0 +1,17 @@ +package eventbus + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/pkg/api" +) + +func NewFarmStatusEvent(farmstatus api.FarmStatusReport) api.EventFarmStatus { + return api.EventFarmStatus(farmstatus) +} + +func (b *Broker) BroadcastFarmStatusEvent(event api.EventFarmStatus) { + log.Debug().Interface("event", event).Msg("eventbus: broadcasting FarmStatus event") + b.broadcast(TopicFarmStatus, event) +} diff --git a/internal/manager/eventbus/topics.go b/internal/manager/eventbus/topics.go index ea2b6056..1e0678f8 100644 --- a/internal/manager/eventbus/topics.go +++ b/internal/manager/eventbus/topics.go @@ -7,6 +7,7 @@ import "fmt" const ( // Topics on which events are published. TopicLifeCycle EventTopic = "/lifecycle" // sends api.EventLifeCycle + TopicFarmStatus EventTopic = "/status" // sends api.EventFarmStatus TopicJobUpdate EventTopic = "/jobs" // sends api.EventJobUpdate TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.EventLastRenderedUpdate TopicTaskUpdate EventTopic = "/task" // sends api.EventTaskUpdate diff --git a/internal/manager/farmstatus/farmstatus.go b/internal/manager/farmstatus/farmstatus.go index 69f673fb..0720db48 100644 --- a/internal/manager/farmstatus/farmstatus.go +++ b/internal/manager/farmstatus/farmstatus.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/website" ) @@ -24,16 +25,18 @@ const ( // Service keeps track of the overall farm status. type Service struct { - persist PersistenceService + persist PersistenceService + eventbus EventBus mutex sync.Mutex lastReport api.FarmStatusReport } -func NewService(persist PersistenceService) *Service { +func NewService(persist PersistenceService, eventbus EventBus) *Service { return &Service{ - persist: persist, - mutex: sync.Mutex{}, + persist: persist, + eventbus: eventbus, + mutex: sync.Mutex{}, lastReport: api.FarmStatusReport{ Status: api.FarmStatusStarting, }, @@ -64,6 +67,18 @@ func (s *Service) Report() api.FarmStatusReport { return s.lastReport } +// updateStatusReport updates the last status report in a thread-safe way. +// It returns whether the report changed. +func (s *Service) updateStatusReport(report api.FarmStatusReport) bool { + s.mutex.Lock() + defer s.mutex.Unlock() + + reportChanged := s.lastReport != report + s.lastReport = report + + return reportChanged +} + func (s *Service) poll(ctx context.Context) { report := s.checkFarmStatus(ctx) if report == nil { @@ -71,9 +86,11 @@ func (s *Service) poll(ctx context.Context) { return } - s.mutex.Lock() - s.lastReport = *report - s.mutex.Unlock() + reportChanged := s.updateStatusReport(*report) + if reportChanged { + event := eventbus.NewFarmStatusEvent(s.lastReport) + s.eventbus.BroadcastFarmStatusEvent(event) + } } // checkFarmStatus checks the farm status by querying the peristence layer. diff --git a/internal/manager/farmstatus/farmstatus_test.go b/internal/manager/farmstatus/farmstatus_test.go index b7d7532c..adf99fda 100644 --- a/internal/manager/farmstatus/farmstatus_test.go +++ b/internal/manager/farmstatus/farmstatus_test.go @@ -14,9 +14,10 @@ import ( ) type Fixtures struct { - service *Service - persist *mocks.MockPersistenceService - ctx context.Context + service *Service + persist *mocks.MockPersistenceService + eventbus *mocks.MockEventBus + ctx context.Context } func TestFarmStatusStarting(t *testing.T) { @@ -158,6 +159,29 @@ func TestCheckFarmStatusAsleep(t *testing.T) { assert.Equal(t, api.FarmStatusAsleep, report.Status) } +func TestFarmStatusEvent(t *testing.T) { + f := fixtures(t) + + // "inoperative": no workers. + f.mockWorkerStatuses(persistence.WorkerStatusCount{}) + f.eventbus.EXPECT().BroadcastFarmStatusEvent(api.EventFarmStatus{ + Status: api.FarmStatusInoperative, + }) + f.service.poll(f.ctx) + + // Re-polling should not trigger any event, as the status doesn't change. + f.mockWorkerStatuses(persistence.WorkerStatusCount{}) + f.service.poll(f.ctx) + + // "active": Actively working on jobs. + f.mockWorkerStatuses(persistence.WorkerStatusCount{api.WorkerStatusAwake: 3}) + f.mockJobStatuses(persistence.JobStatusCount{api.JobStatusActive: 1}) + f.eventbus.EXPECT().BroadcastFarmStatusEvent(api.EventFarmStatus{ + Status: api.FarmStatusActive, + }) + f.service.poll(f.ctx) +} + func Test_allIn(t *testing.T) { type args struct { statuses map[api.WorkerStatus]int @@ -195,11 +219,12 @@ func fixtures(t *testing.T) *Fixtures { mockCtrl := gomock.NewController(t) f := Fixtures{ - persist: mocks.NewMockPersistenceService(mockCtrl), - ctx: context.Background(), + persist: mocks.NewMockPersistenceService(mockCtrl), + eventbus: mocks.NewMockEventBus(mockCtrl), + ctx: context.Background(), } - f.service = NewService(f.persist) + f.service = NewService(f.persist, f.eventbus) return &f } diff --git a/internal/manager/farmstatus/interfaces.go b/internal/manager/farmstatus/interfaces.go index a5687f56..38774a96 100644 --- a/internal/manager/farmstatus/interfaces.go +++ b/internal/manager/farmstatus/interfaces.go @@ -3,11 +3,13 @@ package farmstatus import ( "context" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/pkg/api" ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks projects.blender.org/studio/flamenco/internal/manager/farmstatus PersistenceService +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks projects.blender.org/studio/flamenco/internal/manager/farmstatus PersistenceService,EventBus type PersistenceService interface { SummarizeJobStatuses(ctx context.Context) (persistence.JobStatusCount, error) @@ -15,3 +17,9 @@ type PersistenceService interface { } var _ PersistenceService = (*persistence.DB)(nil) + +type EventBus interface { + BroadcastFarmStatusEvent(event api.EventFarmStatus) +} + +var _ EventBus = (*eventbus.Broker)(nil) diff --git a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go index b252b4ea..820b6829 100644 --- a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go +++ b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: projects.blender.org/studio/flamenco/internal/manager/farmstatus (interfaces: PersistenceService) +// Source: projects.blender.org/studio/flamenco/internal/manager/farmstatus (interfaces: PersistenceService,EventBus) // Package mocks is a generated GoMock package. package mocks @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + api "projects.blender.org/studio/flamenco/pkg/api" ) // MockPersistenceService is a mock of PersistenceService interface. @@ -64,3 +65,38 @@ func (mr *MockPersistenceServiceMockRecorder) SummarizeWorkerStatuses(arg0 inter mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SummarizeWorkerStatuses", reflect.TypeOf((*MockPersistenceService)(nil).SummarizeWorkerStatuses), arg0) } + +// MockEventBus is a mock of EventBus interface. +type MockEventBus struct { + ctrl *gomock.Controller + recorder *MockEventBusMockRecorder +} + +// MockEventBusMockRecorder is the mock recorder for MockEventBus. +type MockEventBusMockRecorder struct { + mock *MockEventBus +} + +// NewMockEventBus creates a new mock instance. +func NewMockEventBus(ctrl *gomock.Controller) *MockEventBus { + mock := &MockEventBus{ctrl: ctrl} + mock.recorder = &MockEventBusMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventBus) EXPECT() *MockEventBusMockRecorder { + return m.recorder +} + +// BroadcastFarmStatusEvent mocks base method. +func (m *MockEventBus) BroadcastFarmStatusEvent(arg0 api.EventFarmStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastFarmStatusEvent", arg0) +} + +// BroadcastFarmStatusEvent indicates an expected call of BroadcastFarmStatusEvent. +func (mr *MockEventBusMockRecorder) BroadcastFarmStatusEvent(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastFarmStatusEvent", reflect.TypeOf((*MockEventBus)(nil).BroadcastFarmStatusEvent), arg0) +}