From b922722614efd5f4091ae4a04e33a346fdcd547b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 13 Jun 2022 13:04:58 +0200 Subject: [PATCH] Manager: broadcast worker timeouts over SocketIO This way the web interface will also show timed-out workers. --- FEATURES.md | 2 +- cmd/flamenco-manager/main.go | 2 +- .../manager/timeout_checker/interfaces.go | 12 +++++- .../mocks/interfaces_mock.gen.go | 37 ++++++++++++++++++- .../timeout_checker/timeout_checker.go | 3 ++ .../timeout_checker/timeout_checker_test.go | 3 ++ internal/manager/timeout_checker/workers.go | 11 +++++- .../manager/timeout_checker/workers_test.go | 10 +++++ 8 files changed, 75 insertions(+), 5 deletions(-) diff --git a/FEATURES.md b/FEATURES.md index efae10c6..8e8183ec 100644 --- a/FEATURES.md +++ b/FEATURES.md @@ -46,7 +46,7 @@ Note that list is **not** in any specific order. Example: jobs in statuses `cancel-requested`, `requeueing`, etc. - [x] Task timeout monitoring - [ ] Worker blocklisting & failed task requeueing -- [ ] Worker timeout monitoring +- [x] Worker timeout monitoring - [ ] Last rendered image display - [ ] Web interface: Support actions on multiple selected things diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index f746d7fc..0b3439e0 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -117,7 +117,7 @@ func main() { timeoutChecker := timeout_checker.New( configService.Get().TaskTimeout, configService.Get().WorkerTimeout, - timeService, persist, taskStateMachine, logStorage) + timeService, persist, taskStateMachine, logStorage, webUpdater) installSignalHandler(mainCtxCancel) diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index 9f072e11..c8aee8ef 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -8,12 +8,13 @@ import ( "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/task_state_machine" + "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" "github.com/rs/zerolog" ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage,ChangeBroadcaster type PersistenceService interface { FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) @@ -35,3 +36,12 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) type LogStorage interface { WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error } + +// TODO: Refactor the way worker status changes are handled, so that this +// service doens't need to broadcast its own worker updates. +type ChangeBroadcaster interface { + BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) +} + +// ChangeBroadcaster should be a subset of webupdates.BiDirComms. +var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go index 77a981f1..9a7367fd 100644 --- a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage) +// Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage,ChangeBroadcaster) // Package mocks is a generated GoMock package. package mocks @@ -169,3 +169,38 @@ func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 in mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3) } + +// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. +type MockChangeBroadcaster struct { + ctrl *gomock.Controller + recorder *MockChangeBroadcasterMockRecorder +} + +// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster. +type MockChangeBroadcasterMockRecorder struct { + mock *MockChangeBroadcaster +} + +// NewMockChangeBroadcaster creates a new mock instance. +func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster { + mock := &MockChangeBroadcaster{ctrl: ctrl} + mock.recorder = &MockChangeBroadcasterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { + return m.recorder +} + +// BroadcastWorkerUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0) +} + +// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0) +} diff --git a/internal/manager/timeout_checker/timeout_checker.go b/internal/manager/timeout_checker/timeout_checker.go index 91b6fbba..2d6ff7e0 100644 --- a/internal/manager/timeout_checker/timeout_checker.go +++ b/internal/manager/timeout_checker/timeout_checker.go @@ -26,6 +26,7 @@ type TimeoutChecker struct { persist PersistenceService taskStateMachine TaskStateMachine logStorage LogStorage + broadcaster ChangeBroadcaster } // New creates a new TimeoutChecker. @@ -36,6 +37,7 @@ func New( persist PersistenceService, taskStateMachine TaskStateMachine, logStorage LogStorage, + broadcaster ChangeBroadcaster, ) *TimeoutChecker { return &TimeoutChecker{ taskTimeout: taskTimeout, @@ -45,6 +47,7 @@ func New( persist: persist, taskStateMachine: taskStateMachine, logStorage: logStorage, + broadcaster: broadcaster, } } diff --git a/internal/manager/timeout_checker/timeout_checker_test.go b/internal/manager/timeout_checker/timeout_checker_test.go index d2c34a48..7d62f1ca 100644 --- a/internal/manager/timeout_checker/timeout_checker_test.go +++ b/internal/manager/timeout_checker/timeout_checker_test.go @@ -20,6 +20,7 @@ type TimeoutCheckerMocks struct { persist *mocks.MockPersistenceService taskStateMachine *mocks.MockTaskStateMachine logStorage *mocks.MockLogStorage + broadcaster *mocks.MockChangeBroadcaster ctx context.Context cancel context.CancelFunc @@ -43,6 +44,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout persist: mocks.NewMockPersistenceService(mockCtrl), taskStateMachine: mocks.NewMockTaskStateMachine(mockCtrl), logStorage: mocks.NewMockLogStorage(mockCtrl), + broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), wg: new(sync.WaitGroup), } @@ -71,6 +73,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout mocks.persist, mocks.taskStateMachine, mocks.logStorage, + mocks.broadcaster, ) return sm, finish, mocks } diff --git a/internal/manager/timeout_checker/workers.go b/internal/manager/timeout_checker/workers.go index 8bdce881..2d1a4cb2 100644 --- a/internal/manager/timeout_checker/workers.go +++ b/internal/manager/timeout_checker/workers.go @@ -45,6 +45,7 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc Logger() logger.Warn().Msg("TimeoutChecker: worker timed out") + prevStatus := worker.Status worker.Status = api.WorkerStatusError worker.StatusChangeClear() @@ -58,5 +59,13 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc logger.Error().Err(err).Msg("TimeoutChecker: error re-queueing tasks of timed-out worker") } - // TODO: broadcast worker change via SocketIO + // Broadcast worker change via SocketIO + ttc.broadcaster.BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: worker.Name, + PreviousStatus: &prevStatus, + Status: api.WorkerStatusError, + Updated: worker.UpdatedAt, + Version: worker.Software, + }) } diff --git a/internal/manager/timeout_checker/workers_test.go b/internal/manager/timeout_checker/workers_test.go index 713d1a3d..c95eb44e 100644 --- a/internal/manager/timeout_checker/workers_test.go +++ b/internal/manager/timeout_checker/workers_test.go @@ -53,6 +53,16 @@ func TestWorkerTimeout(t *testing.T) { persistedWorker.StatusChangeClear() mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil) + prevStatus := worker.Status + mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Nickname: worker.Name, + PreviousStatus: &prevStatus, + Status: api.WorkerStatusError, + Updated: persistedWorker.UpdatedAt, + Version: persistedWorker.Software, + }) + // All the timeouts should be handled after the initial sleep. mocks.clock.Add(timeoutInitialSleep) }