Manager: broadcast worker timeouts over SocketIO

This way the web interface will also show timed-out workers.
This commit is contained in:
Sybren A. Stüvel 2022-06-13 13:04:58 +02:00
parent 75ca0e652e
commit b922722614
8 changed files with 75 additions and 5 deletions

View File

@ -46,7 +46,7 @@ Note that list is **not** in any specific order.
Example: jobs in statuses `cancel-requested`, `requeueing`, etc. Example: jobs in statuses `cancel-requested`, `requeueing`, etc.
- [x] Task timeout monitoring - [x] Task timeout monitoring
- [ ] Worker blocklisting & failed task requeueing - [ ] Worker blocklisting & failed task requeueing
- [ ] Worker timeout monitoring - [x] Worker timeout monitoring
- [ ] Last rendered image display - [ ] Last rendered image display
- [ ] Web interface: Support actions on multiple selected things - [ ] Web interface: Support actions on multiple selected things

View File

@ -117,7 +117,7 @@ func main() {
timeoutChecker := timeout_checker.New( timeoutChecker := timeout_checker.New(
configService.Get().TaskTimeout, configService.Get().TaskTimeout,
configService.Get().WorkerTimeout, configService.Get().WorkerTimeout,
timeService, persist, taskStateMachine, logStorage) timeService, persist, taskStateMachine, logStorage, webUpdater)
installSignalHandler(mainCtxCancel) installSignalHandler(mainCtxCancel)

View File

@ -8,12 +8,13 @@ import (
"git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
// Generate mock implementations of these interfaces. // Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage //go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage,ChangeBroadcaster
type PersistenceService interface { type PersistenceService interface {
FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error)
@ -35,3 +36,12 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)
type LogStorage interface { type LogStorage interface {
WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error
} }
// TODO: Refactor the way worker status changes are handled, so that this
// service doens't need to broadcast its own worker updates.
type ChangeBroadcaster interface {
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage) // Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage,ChangeBroadcaster)
// Package mocks is a generated GoMock package. // Package mocks is a generated GoMock package.
package mocks package mocks
@ -169,3 +169,38 @@ func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 in
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3)
} }
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller
recorder *MockChangeBroadcasterMockRecorder
}
// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster.
type MockChangeBroadcasterMockRecorder struct {
mock *MockChangeBroadcaster
}
// NewMockChangeBroadcaster creates a new mock instance.
func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster {
mock := &MockChangeBroadcaster{ctrl: ctrl}
mock.recorder = &MockChangeBroadcasterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastWorkerUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0)
}
// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0)
}

View File

@ -26,6 +26,7 @@ type TimeoutChecker struct {
persist PersistenceService persist PersistenceService
taskStateMachine TaskStateMachine taskStateMachine TaskStateMachine
logStorage LogStorage logStorage LogStorage
broadcaster ChangeBroadcaster
} }
// New creates a new TimeoutChecker. // New creates a new TimeoutChecker.
@ -36,6 +37,7 @@ func New(
persist PersistenceService, persist PersistenceService,
taskStateMachine TaskStateMachine, taskStateMachine TaskStateMachine,
logStorage LogStorage, logStorage LogStorage,
broadcaster ChangeBroadcaster,
) *TimeoutChecker { ) *TimeoutChecker {
return &TimeoutChecker{ return &TimeoutChecker{
taskTimeout: taskTimeout, taskTimeout: taskTimeout,
@ -45,6 +47,7 @@ func New(
persist: persist, persist: persist,
taskStateMachine: taskStateMachine, taskStateMachine: taskStateMachine,
logStorage: logStorage, logStorage: logStorage,
broadcaster: broadcaster,
} }
} }

View File

@ -20,6 +20,7 @@ type TimeoutCheckerMocks struct {
persist *mocks.MockPersistenceService persist *mocks.MockPersistenceService
taskStateMachine *mocks.MockTaskStateMachine taskStateMachine *mocks.MockTaskStateMachine
logStorage *mocks.MockLogStorage logStorage *mocks.MockLogStorage
broadcaster *mocks.MockChangeBroadcaster
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -43,6 +44,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout
persist: mocks.NewMockPersistenceService(mockCtrl), persist: mocks.NewMockPersistenceService(mockCtrl),
taskStateMachine: mocks.NewMockTaskStateMachine(mockCtrl), taskStateMachine: mocks.NewMockTaskStateMachine(mockCtrl),
logStorage: mocks.NewMockLogStorage(mockCtrl), logStorage: mocks.NewMockLogStorage(mockCtrl),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
wg: new(sync.WaitGroup), wg: new(sync.WaitGroup),
} }
@ -71,6 +73,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout
mocks.persist, mocks.persist,
mocks.taskStateMachine, mocks.taskStateMachine,
mocks.logStorage, mocks.logStorage,
mocks.broadcaster,
) )
return sm, finish, mocks return sm, finish, mocks
} }

View File

@ -45,6 +45,7 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc
Logger() Logger()
logger.Warn().Msg("TimeoutChecker: worker timed out") logger.Warn().Msg("TimeoutChecker: worker timed out")
prevStatus := worker.Status
worker.Status = api.WorkerStatusError worker.Status = api.WorkerStatusError
worker.StatusChangeClear() worker.StatusChangeClear()
@ -58,5 +59,13 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc
logger.Error().Err(err).Msg("TimeoutChecker: error re-queueing tasks of timed-out worker") logger.Error().Err(err).Msg("TimeoutChecker: error re-queueing tasks of timed-out worker")
} }
// TODO: broadcast worker change via SocketIO // Broadcast worker change via SocketIO
ttc.broadcaster.BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: worker.Name,
PreviousStatus: &prevStatus,
Status: api.WorkerStatusError,
Updated: worker.UpdatedAt,
Version: worker.Software,
})
} }

View File

@ -53,6 +53,16 @@ func TestWorkerTimeout(t *testing.T) {
persistedWorker.StatusChangeClear() persistedWorker.StatusChangeClear()
mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil) mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil)
prevStatus := worker.Status
mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Nickname: worker.Name,
PreviousStatus: &prevStatus,
Status: api.WorkerStatusError,
Updated: persistedWorker.UpdatedAt,
Version: persistedWorker.Software,
})
// All the timeouts should be handled after the initial sleep. // All the timeouts should be handled after the initial sleep.
mocks.clock.Add(timeoutInitialSleep) mocks.clock.Add(timeoutInitialSleep)
} }