From c1a9b1e877a9c1f983b2dcaa070e325c85c3978a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 1 Mar 2024 22:26:05 +0100 Subject: [PATCH] Manager: force a poll of the farm status when a job/worker changes state This introduces the concept of 'event listener', which is now used by the farm status service to respond to events on the event bus. This makes it possible to reduce the regular poll period from 5 to 30 seconds. That's now only necessary as backup, just in case events are missed or otherwise things change without the event bus logic noticing. --- internal/manager/eventbus/eventbus.go | 18 +++++++ internal/manager/eventbus/socketio.go | 6 +++ internal/manager/farmstatus/farmstatus.go | 50 +++++++++++++++++-- .../manager/farmstatus/farmstatus_test.go | 2 + internal/manager/farmstatus/interfaces.go | 1 + .../farmstatus/mocks/interfaces_mock.gen.go | 13 +++++ 6 files changed, 85 insertions(+), 5 deletions(-) diff --git a/internal/manager/eventbus/eventbus.go b/internal/manager/eventbus/eventbus.go index 8d533dd6..27392020 100644 --- a/internal/manager/eventbus/eventbus.go +++ b/internal/manager/eventbus/eventbus.go @@ -10,17 +10,25 @@ type ( EventTopic string ) +// Listener is the interface for internal components that want to respond to events. +type Listener interface { + OnEvent(topic EventTopic, payload interface{}) +} + +// Forwarder is the interface for components that forward events to external systems. type Forwarder interface { Broadcast(topic EventTopic, payload interface{}) } type Broker struct { + listeners []Listener forwarders []Forwarder mutex sync.Mutex } func NewBroker() *Broker { return &Broker{ + listeners: []Listener{}, forwarders: []Forwarder{}, mutex: sync.Mutex{}, } @@ -32,10 +40,20 @@ func (b *Broker) AddForwarder(forwarder Forwarder) { b.forwarders = append(b.forwarders, forwarder) } +func (b *Broker) AddListener(listener Listener) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.listeners = append(b.listeners, listener) +} + func (b *Broker) broadcast(topic EventTopic, payload interface{}) { b.mutex.Lock() defer b.mutex.Unlock() + for _, listener := range b.listeners { + listener.OnEvent(topic, payload) + } + for _, forwarder := range b.forwarders { forwarder.Broadcast(topic, payload) } diff --git a/internal/manager/eventbus/socketio.go b/internal/manager/eventbus/socketio.go index 1ff4e190..57f59405 100644 --- a/internal/manager/eventbus/socketio.go +++ b/internal/manager/eventbus/socketio.go @@ -24,6 +24,8 @@ const ( ) var socketIOEventTypes = map[string]string{ + reflect.TypeOf(api.EventLifeCycle{}).Name(): "/lifecycle", + reflect.TypeOf(api.EventFarmStatus{}).Name(): "/status", reflect.TypeOf(api.EventJobUpdate{}).Name(): "/jobs", reflect.TypeOf(api.EventTaskUpdate{}).Name(): "/task", reflect.TypeOf(api.EventLastRenderedUpdate{}).Name(): "/last-rendered", @@ -91,6 +93,10 @@ func (s *SocketIOForwarder) registerSIOEventHandlers() { _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { logger := sioLogger(c) logger.Debug().Msg("socketIO: connected") + + // All SocketIO connections get these events, regardless of their subscription. + _ = c.Join(string(TopicLifeCycle)) + _ = c.Join(string(TopicFarmStatus)) }) // socket disconnection diff --git a/internal/manager/farmstatus/farmstatus.go b/internal/manager/farmstatus/farmstatus.go index 0720db48..82c440fd 100644 --- a/internal/manager/farmstatus/farmstatus.go +++ b/internal/manager/farmstatus/farmstatus.go @@ -20,7 +20,7 @@ const ( // // Note that this indicates the time between polls, so between a poll // operation being done, and the next one starting. - pollWait = 5 * time.Second + pollWait = 30 * time.Second ) // Service keeps track of the overall farm status. @@ -30,17 +30,24 @@ type Service struct { mutex sync.Mutex lastReport api.FarmStatusReport + forcePoll chan struct{} // Send anything here to force a poll, if none is running yet. } +// NewService returns a 'farm status' service. Run its Run() function in a +// goroutine to make it actually do something. func NewService(persist PersistenceService, eventbus EventBus) *Service { - return &Service{ - persist: persist, - eventbus: eventbus, - mutex: sync.Mutex{}, + service := Service{ + persist: persist, + eventbus: eventbus, + mutex: sync.Mutex{}, + forcePoll: make(chan struct{}, 1), lastReport: api.FarmStatusReport{ Status: api.FarmStatusStarting, }, } + + eventbus.AddListener(&service) + return &service } // Run the farm status polling loop. @@ -54,10 +61,43 @@ func (s *Service) Run(ctx context.Context) { return case <-time.After(pollWait): s.poll(ctx) + case <-s.forcePoll: + s.poll(ctx) } } } +func (s *Service) OnEvent(topic eventbus.EventTopic, payload interface{}) { + forcePoll := false + eventSubject := "" + + switch event := payload.(type) { + case api.EventJobUpdate: + forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status + eventSubject = "job" + case api.EventWorkerUpdate: + forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status + eventSubject = "worker" + } + + if !forcePoll { + return + } + + log.Debug(). + Str("event", string(topic)). + Msgf("farm status: investigating after %s status change", eventSubject) + + // Polling queries the database, and thus can have a non-trivial duration. + // Better to run in the Run() goroutine. + select { + case s.forcePoll <- struct{}{}: + default: + // If sending to the channel fails, there is already a struct{}{} in + // there, and thus a poll will be triggered ASAP anyway. + } +} + // Report returns the last-known farm status report. // // It is updated every few seconds, from the Run() function. diff --git a/internal/manager/farmstatus/farmstatus_test.go b/internal/manager/farmstatus/farmstatus_test.go index 4ccc372a..f6eb7e52 100644 --- a/internal/manager/farmstatus/farmstatus_test.go +++ b/internal/manager/farmstatus/farmstatus_test.go @@ -225,6 +225,8 @@ func fixtures(t *testing.T) *Fixtures { ctx: context.Background(), } + // calling NewService() immediate registers as a listener with the event bus. + f.eventbus.EXPECT().AddListener(gomock.Any()) f.service = NewService(f.persist, f.eventbus) return &f diff --git a/internal/manager/farmstatus/interfaces.go b/internal/manager/farmstatus/interfaces.go index 38774a96..36798e17 100644 --- a/internal/manager/farmstatus/interfaces.go +++ b/internal/manager/farmstatus/interfaces.go @@ -19,6 +19,7 @@ type PersistenceService interface { var _ PersistenceService = (*persistence.DB)(nil) type EventBus interface { + AddListener(listener eventbus.Listener) BroadcastFarmStatusEvent(event api.EventFarmStatus) } diff --git a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go index 820b6829..55099521 100644 --- a/internal/manager/farmstatus/mocks/interfaces_mock.gen.go +++ b/internal/manager/farmstatus/mocks/interfaces_mock.gen.go @@ -9,6 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + eventbus "projects.blender.org/studio/flamenco/internal/manager/eventbus" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -89,6 +90,18 @@ func (m *MockEventBus) EXPECT() *MockEventBusMockRecorder { return m.recorder } +// AddListener mocks base method. +func (m *MockEventBus) AddListener(arg0 eventbus.Listener) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddListener", arg0) +} + +// AddListener indicates an expected call of AddListener. +func (mr *MockEventBusMockRecorder) AddListener(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddListener", reflect.TypeOf((*MockEventBus)(nil).AddListener), arg0) +} + // BroadcastFarmStatusEvent mocks base method. func (m *MockEventBus) BroadcastFarmStatusEvent(arg0 api.EventFarmStatus) { m.ctrl.T.Helper()