Manager: force a poll of the farm status when a job/worker changes state

This introduces the concept of 'event listener', which is now used by
the farm status service to respond to events on the event bus.

This makes it possible to reduce the regular poll period from 5 to 30
seconds. That's now only necessary as backup, just in case events are
missed or otherwise things change without the event bus logic noticing.
This commit is contained in:
Sybren A. Stüvel 2024-03-01 22:26:05 +01:00
parent 9bfb53a7f6
commit c1a9b1e877
6 changed files with 85 additions and 5 deletions

View File

@ -10,17 +10,25 @@ type (
EventTopic string
)
// Listener is the interface for internal components that want to respond to events.
type Listener interface {
OnEvent(topic EventTopic, payload interface{})
}
// Forwarder is the interface for components that forward events to external systems.
type Forwarder interface {
Broadcast(topic EventTopic, payload interface{})
}
type Broker struct {
listeners []Listener
forwarders []Forwarder
mutex sync.Mutex
}
func NewBroker() *Broker {
return &Broker{
listeners: []Listener{},
forwarders: []Forwarder{},
mutex: sync.Mutex{},
}
@ -32,10 +40,20 @@ func (b *Broker) AddForwarder(forwarder Forwarder) {
b.forwarders = append(b.forwarders, forwarder)
}
func (b *Broker) AddListener(listener Listener) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.listeners = append(b.listeners, listener)
}
func (b *Broker) broadcast(topic EventTopic, payload interface{}) {
b.mutex.Lock()
defer b.mutex.Unlock()
for _, listener := range b.listeners {
listener.OnEvent(topic, payload)
}
for _, forwarder := range b.forwarders {
forwarder.Broadcast(topic, payload)
}

View File

@ -24,6 +24,8 @@ const (
)
var socketIOEventTypes = map[string]string{
reflect.TypeOf(api.EventLifeCycle{}).Name(): "/lifecycle",
reflect.TypeOf(api.EventFarmStatus{}).Name(): "/status",
reflect.TypeOf(api.EventJobUpdate{}).Name(): "/jobs",
reflect.TypeOf(api.EventTaskUpdate{}).Name(): "/task",
reflect.TypeOf(api.EventLastRenderedUpdate{}).Name(): "/last-rendered",
@ -91,6 +93,10 @@ func (s *SocketIOForwarder) registerSIOEventHandlers() {
_ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Debug().Msg("socketIO: connected")
// All SocketIO connections get these events, regardless of their subscription.
_ = c.Join(string(TopicLifeCycle))
_ = c.Join(string(TopicFarmStatus))
})
// socket disconnection

View File

@ -20,7 +20,7 @@ const (
//
// Note that this indicates the time between polls, so between a poll
// operation being done, and the next one starting.
pollWait = 5 * time.Second
pollWait = 30 * time.Second
)
// Service keeps track of the overall farm status.
@ -30,17 +30,24 @@ type Service struct {
mutex sync.Mutex
lastReport api.FarmStatusReport
forcePoll chan struct{} // Send anything here to force a poll, if none is running yet.
}
// NewService returns a 'farm status' service. Run its Run() function in a
// goroutine to make it actually do something.
func NewService(persist PersistenceService, eventbus EventBus) *Service {
return &Service{
persist: persist,
eventbus: eventbus,
mutex: sync.Mutex{},
service := Service{
persist: persist,
eventbus: eventbus,
mutex: sync.Mutex{},
forcePoll: make(chan struct{}, 1),
lastReport: api.FarmStatusReport{
Status: api.FarmStatusStarting,
},
}
eventbus.AddListener(&service)
return &service
}
// Run the farm status polling loop.
@ -54,10 +61,43 @@ func (s *Service) Run(ctx context.Context) {
return
case <-time.After(pollWait):
s.poll(ctx)
case <-s.forcePoll:
s.poll(ctx)
}
}
}
func (s *Service) OnEvent(topic eventbus.EventTopic, payload interface{}) {
forcePoll := false
eventSubject := ""
switch event := payload.(type) {
case api.EventJobUpdate:
forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status
eventSubject = "job"
case api.EventWorkerUpdate:
forcePoll = event.PreviousStatus != nil && *event.PreviousStatus != event.Status
eventSubject = "worker"
}
if !forcePoll {
return
}
log.Debug().
Str("event", string(topic)).
Msgf("farm status: investigating after %s status change", eventSubject)
// Polling queries the database, and thus can have a non-trivial duration.
// Better to run in the Run() goroutine.
select {
case s.forcePoll <- struct{}{}:
default:
// If sending to the channel fails, there is already a struct{}{} in
// there, and thus a poll will be triggered ASAP anyway.
}
}
// Report returns the last-known farm status report.
//
// It is updated every few seconds, from the Run() function.

View File

@ -225,6 +225,8 @@ func fixtures(t *testing.T) *Fixtures {
ctx: context.Background(),
}
// calling NewService() immediate registers as a listener with the event bus.
f.eventbus.EXPECT().AddListener(gomock.Any())
f.service = NewService(f.persist, f.eventbus)
return &f

View File

@ -19,6 +19,7 @@ type PersistenceService interface {
var _ PersistenceService = (*persistence.DB)(nil)
type EventBus interface {
AddListener(listener eventbus.Listener)
BroadcastFarmStatusEvent(event api.EventFarmStatus)
}

View File

@ -9,6 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
eventbus "projects.blender.org/studio/flamenco/internal/manager/eventbus"
persistence "projects.blender.org/studio/flamenco/internal/manager/persistence"
api "projects.blender.org/studio/flamenco/pkg/api"
)
@ -89,6 +90,18 @@ func (m *MockEventBus) EXPECT() *MockEventBusMockRecorder {
return m.recorder
}
// AddListener mocks base method.
func (m *MockEventBus) AddListener(arg0 eventbus.Listener) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "AddListener", arg0)
}
// AddListener indicates an expected call of AddListener.
func (mr *MockEventBusMockRecorder) AddListener(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddListener", reflect.TypeOf((*MockEventBus)(nil).AddListener), arg0)
}
// BroadcastFarmStatusEvent mocks base method.
func (m *MockEventBus) BroadcastFarmStatusEvent(arg0 api.EventFarmStatus) {
m.ctrl.T.Helper()