diff --git a/internal/manager/eventbus/eventbus.go b/internal/manager/eventbus/eventbus.go index 8d533dd6..27392020 100644 --- a/internal/manager/eventbus/eventbus.go +++ b/internal/manager/eventbus/eventbus.go @@ -10,17 +10,25 @@ type ( EventTopic string ) +// Listener is the interface for internal components that want to respond to events. +type Listener interface { + OnEvent(topic EventTopic, payload interface{}) +} + +// Forwarder is the interface for components that forward events to external systems. type Forwarder interface { Broadcast(topic EventTopic, payload interface{}) } type Broker struct { + listeners []Listener forwarders []Forwarder mutex sync.Mutex } func NewBroker() *Broker { return &Broker{ + listeners: []Listener{}, forwarders: []Forwarder{}, mutex: sync.Mutex{}, } @@ -32,10 +40,20 @@ func (b *Broker) AddForwarder(forwarder Forwarder) { b.forwarders = append(b.forwarders, forwarder) } +func (b *Broker) AddListener(listener Listener) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.listeners = append(b.listeners, listener) +} + func (b *Broker) broadcast(topic EventTopic, payload interface{}) { b.mutex.Lock() defer b.mutex.Unlock() + for _, listener := range b.listeners { + listener.OnEvent(topic, payload) + } + for _, forwarder := range b.forwarders { forwarder.Broadcast(topic, payload) } diff --git a/internal/manager/eventbus/socketio.go b/internal/manager/eventbus/socketio.go index 1ff4e190..57f59405 100644 --- a/internal/manager/eventbus/socketio.go +++ b/internal/manager/eventbus/socketio.go @@ -24,6 +24,8 @@ const ( ) var socketIOEventTypes = map[string]string{ + reflect.TypeOf(api.EventLifeCycle{}).Name(): "/lifecycle", + reflect.TypeOf(api.EventFarmStatus{}).Name(): "/status", reflect.TypeOf(api.EventJobUpdate{}).Name(): "/jobs", reflect.TypeOf(api.EventTaskUpdate{}).Name(): "/task", reflect.TypeOf(api.EventLastRenderedUpdate{}).Name(): "/last-rendered", @@ -91,6 +93,10 @@ func (s *SocketIOForwarder) registerSIOEventHandlers() { _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { logger := sioLogger(c) logger.Debug().Msg("socketIO: connected") + + // All SocketIO connections get these events, regardless of their subscription. + _ = c.Join(string(TopicLifeCycle)) + _ = c.Join(string(TopicFarmStatus)) }) // socket disconnection diff --git a/internal/manager/farmstatus/farmstatus.go b/internal/manager/farmstatus/farmstatus.go index 0720db48..82c440fd 100644 --- a/internal/manager/farmstatus/farmstatus.go +++ b/internal/manager/farmstatus/farmstatus.go @@ -20,7 +20,7 @@ const ( // // Note that this indicates the time between polls, so between a poll // operation being done, and the next one starting. - pollWait = 5 * time.Second + pollWait = 30 * time.Second ) // Service keeps track of the overall farm status. @@ -30,17 +30,24 @@ type Service struct { mutex sync.Mutex lastReport api.FarmStatusReport + forcePoll chan struct{} // Send anything here to force a poll, if none is running yet. } +// NewService returns a 'farm status' service. Run its Run() function in a +// goroutine to make it actually do something. func NewService(persist PersistenceService, eventbus EventBus) *Service { - return &Service{ - persist: persist, - eventbus: eventbus, - mutex: sync.Mutex{}, + service := Service{ + persist: persist, + eventbus: eventbus, + mutex: sync.Mutex{}, + forcePoll: make(chan struct{}, 1), lastReport: api.FarmStatusReport{ Status: api.FarmStatusStarting, }, } + + eventbus.AddListener(&service) + return &service } // Run the farm status polling loop. @@ -54,10 +61,43 @@ func (s *Service) Run(ctx context.Context) { return case <-time.After(pollWait): s.poll(ctx) + case <-s.forcePoll: + s.poll(ctx) } } } +func (s *Service) OnEvent(topic eventbus.EventTopic, payload interface{}) { + forcePoll := false + eventSubject := "" + + switch event := payload.(type) { + case api.EventJobUpdate: + forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status + eventSubject = "job" + case api.EventWorkerUpdate: + forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status + eventSubject = "worker" + } + + if !forcePoll { + return + } + + log.Debug(). + Str("event", string(topic)). + Msgf("farm status: investigating after %s status change", eventSubject) + + // Polling queries the database, and thus can have a non-trivial duration. + // Better to run in the Run() goroutine. + select { + case s.forcePoll <- struct{}{}: + default: + // If sending to the channel fails, there is already a struct{}{} in + // there, and thus a poll will be triggered ASAP anyway. + } +} + // Report returns the last-known farm status report. // // It is updated every few seconds, from the Run() function. diff --git a/internal/manager/farmstatus/farmstatus_test.go b/internal/manager/farmstatus/farmstatus_test.go index 4ccc372a..f6eb7e52 100644 --- a/internal/manager/farmstatus/farmstatus_test.go +++ b/internal/manager/farmstatus/farmstatus_test.go @@ -225,6 +225,8 @@ func fixtures(t *testing.T) *Fixtures { ctx: context.Background(), } + // calling NewService() immediate registers as a listener with the event bus. + f.eventbus.EXPECT().AddListener(gomock.Any()) f.service = NewService(f.persist, f.eventbus) return &f diff --git a/internal/manager/farmstatus/interfaces.go b/internal/manager/farmstatus/interfaces.go index 38774a96..36798e17 100644 --- a/internal/manager/farmstatus/interfaces.go +++ b/internal/manager/farmstatus/interfaces.go @@ -19,6 +19,7 @@ type PersistenceService interface { var _ PersistenceService = (*persistence.DB)(nil) type EventBus interface { + AddListener(listener eventbus.Listener) BroadcastFarmStatusEvent(event api.EventFarmStatus) } diff --git a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go index 820b6829..55099521 100644 --- a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go +++ b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go @@ -9,6 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + eventbus "projects.blender.org/studio/flamenco/internal/manager/eventbus" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -89,6 +90,18 @@ func (m *MockEventBus) EXPECT() *MockEventBusMockRecorder { return m.recorder } +// AddListener mocks base method. +func (m *MockEventBus) AddListener(arg0 eventbus.Listener) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddListener", arg0) +} + +// AddListener indicates an expected call of AddListener. +func (mr *MockEventBusMockRecorder) AddListener(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddListener", reflect.TypeOf((*MockEventBus)(nil).AddListener), arg0) +} + // BroadcastFarmStatusEvent mocks base method. func (m *MockEventBus) BroadcastFarmStatusEvent(arg0 api.EventFarmStatus) { m.ctrl.T.Helper()