From 819cad1d18df2540625947eb2ed7c240d42f1fa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 9 Jun 2022 16:49:48 +0200 Subject: [PATCH] Manager: move broadcasting of task logs via SocketIO to task log service To ensure all task logs also get broadcast via SocketIO, the responsibility has moved from the `api_impl` to the `task_logs` package. --- cmd/flamenco-manager/main.go | 2 +- internal/manager/api_impl/interfaces.go | 4 +- .../api_impl/mocks/api_impl_mock.gen.go | 12 ---- internal/manager/api_impl/workers.go | 20 +----- internal/manager/api_impl/workers_test.go | 7 -- .../task_logs/mocks/interfaces_mock.gen.go | 47 ++++++++++++++ internal/manager/task_logs/task_logs.go | 46 +++++++++++-- internal/manager/task_logs/task_logs_test.go | 65 ++++++++++++++----- 8 files changed, 144 insertions(+), 59 deletions(-) create mode 100644 internal/manager/task_logs/mocks/interfaces_mock.gen.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 4112f07d..2f528103 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -159,7 +159,7 @@ func buildFlamencoAPI( if err != nil { log.Fatal().Err(err).Msg("error loading job compilers") } - logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath) + logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, webUpdater) shamanServer := shaman.NewServer(configService.Get().Shaman, nil) flamenco := api_impl.NewFlamenco( compiler, persist, webUpdater, logStorage, configService, diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 0622333d..fd6758c0 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -72,7 +72,9 @@ type ChangeBroadcaster interface { // after the job's tasks have been created, and thus there is no need for a // separate broadcast per task. - BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) + // Note that there is no call to BoardcastTaskLogUpdate. It's the + // responsibility of `LogStorage.Write` to broadcast the changes to SocketIO + // clients. BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 4358ca2c..bc603222 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -305,18 +305,6 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0) } -// BroadcastTaskLogUpdate mocks base method. -func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0) -} - -// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate. -func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0) -} - // BroadcastWorkerUpdate mocks base method. func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 71cf5bc2..af9c9401 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -441,7 +441,8 @@ func (f *Flamenco) doTaskUpdate( } if update.Log != nil { - f.taskLogAppend(logger, dbTask, *update.Log) + // Errors writing the log to disk are already logged by logStorage, and can be safely ignored here. + _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) } // Any error updating the status is more important than an error updating the @@ -465,25 +466,10 @@ func (f *Flamenco) workerPingedTask( return nil } -// taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO. -func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) { - // Errors writing the log to file should be logged in our own logging - // system, but shouldn't ripple up. As such, `err` is not returned to - // the caller. - err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, logChunk) - if err != nil { - logger.Error().Err(err).Msg("error writing task log") - } - - // Broadcast the task log to SocketIO clients. - taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, logChunk) - f.broadcaster.BroadcastTaskLogUpdate(taskUpdate) -} - // taskLogAppendTimestamped writes the given log text, prefixed with the current date & time, to the task's log. func (f *Flamenco) taskLogAppendTimestamped(logger zerolog.Logger, dbTask *persistence.Task, logText string) { now := f.clock.Now().Format(time.RFC3339) - f.taskLogAppend(logger, dbTask, now+" "+logText) + _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, now+" "+logText) } func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index ace58ec7..8098ed42 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -38,7 +38,6 @@ func TestTaskScheduleHappy(t *testing.T) { mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task.UUID, "2022-06-09T11:14:41+02:00 Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)\n") - mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()) // The task log should be updated; this test assumes the contents are ok. err := mf.flamenco.ScheduleTask(echo) assert.NoError(t, err) @@ -171,8 +170,6 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { logMsg := "2022-06-09T11:14:41+02:00 Task was requeued by Manager because the worker assigned to it signed off.\n" mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg) mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg) - mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task1.UUID, Log: logMsg}) - mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task2.UUID, Log: logMsg}) // Expect worker to be saved as 'offline'. mf.persistence.EXPECT(). @@ -389,10 +386,6 @@ func TestTaskUpdate(t *testing.T) { // Expect the log to be written and broadcast over SocketIO. mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n") - mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{ - TaskId: taskID, - Log: "line1\nline2\n", - }) // Expect a 'touch' of the task. var touchedTask persistence.Task diff --git a/internal/manager/task_logs/mocks/interfaces_mock.gen.go b/internal/manager/task_logs/mocks/interfaces_mock.gen.go new file mode 100644 index 00000000..51308612 --- /dev/null +++ b/internal/manager/task_logs/mocks/interfaces_mock.gen.go @@ -0,0 +1,47 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: ChangeBroadcaster) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + api "git.blender.org/flamenco/pkg/api" + gomock "github.com/golang/mock/gomock" +) + +// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. +type MockChangeBroadcaster struct { + ctrl *gomock.Controller + recorder *MockChangeBroadcasterMockRecorder +} + +// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster. +type MockChangeBroadcasterMockRecorder struct { + mock *MockChangeBroadcaster +} + +// NewMockChangeBroadcaster creates a new mock instance. +func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster { + mock := &MockChangeBroadcaster{ctrl: ctrl} + mock.recorder = &MockChangeBroadcasterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { + return m.recorder +} + +// BroadcastTaskLogUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0) +} + +// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0) +} diff --git a/internal/manager/task_logs/task_logs.go b/internal/manager/task_logs/task_logs.go index c0799a1e..bfd08f03 100644 --- a/internal/manager/task_logs/task_logs.go +++ b/internal/manager/task_logs/task_logs.go @@ -11,6 +11,8 @@ import ( "path/filepath" "sync" + "git.blender.org/flamenco/internal/manager/webupdates" + "git.blender.org/flamenco/pkg/api" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -24,13 +26,29 @@ const ( type Storage struct { BasePath string // Directory where task logs are stored. + broadcaster ChangeBroadcaster + // Locks to only allow one goroutine at a time to handle the logs of a certain task. mutex *sync.Mutex taskLocks map[string]*sync.Mutex } +// Generate mock implementations of these interfaces. +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs ChangeBroadcaster + +type ChangeBroadcaster interface { + // BroadcastTaskLogUpdate sends the task log update to SocketIO clients. + BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) +} + +// ChangeBroadcaster should be a subset of webupdates.BiDirComms +var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) + // NewStorage creates a new log storage rooted at `basePath`. -func NewStorage(basePath string) *Storage { +func NewStorage( + basePath string, + broadcaster ChangeBroadcaster, +) *Storage { if !filepath.IsAbs(basePath) { absPath, err := filepath.Abs(basePath) if err != nil { @@ -44,13 +62,33 @@ func NewStorage(basePath string) *Storage { Msg("task logs") return &Storage{ - BasePath: basePath, - mutex: new(sync.Mutex), - taskLocks: make(map[string]*sync.Mutex), + BasePath: basePath, + broadcaster: broadcaster, + mutex: new(sync.Mutex), + taskLocks: make(map[string]*sync.Mutex), } } +// Write appends text to a task's log file, and broadcasts the log lines via SocketIO. func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText string) error { + if err := s.writeToDisk(logger, jobID, taskID, logText); err != nil { + return err + } + + // Broadcast the task log to SocketIO clients. + taskUpdate := webupdates.NewTaskLogUpdate(taskID, logText) + s.broadcaster.BroadcastTaskLogUpdate(taskUpdate) + return nil +} + +// // Write appends text, prefixed with the current date & time, to a task's log file, +// // and broadcasts the log lines via SocketIO. +// func (s *Storage) WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error { +// now := s.clock.Now().Format(time.RFC3339) +// return s.Write(logger, jobID, taskID, now+" "+logText) +// } + +func (s *Storage) writeToDisk(logger zerolog.Logger, jobID, taskID string, logText string) error { // Shortcut to avoid creating an empty log file. It also solves an // index out of bounds error further down when we check the last character. if logText == "" { diff --git a/internal/manager/task_logs/task_logs_test.go b/internal/manager/task_logs/task_logs_test.go index 75c08797..a70e28f2 100644 --- a/internal/manager/task_logs/task_logs_test.go +++ b/internal/manager/task_logs/task_logs_test.go @@ -11,22 +11,20 @@ import ( "sync" "testing" + "git.blender.org/flamenco/internal/manager/task_logs/mocks" + "github.com/benbjohnson/clock" + "github.com/golang/mock/gomock" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" ) -func tempStorage() *Storage { - temppath, err := ioutil.TempDir("", "testlogs") - if err != nil { - panic(err) - } - return NewStorage(temppath) -} - func TestLogWriting(t *testing.T) { - s := tempStorage() - defer os.RemoveAll(s.BasePath) + s, finish, mocks := taskLogsTestFixtures(t) + defer finish() + + // Expect broadcastst for each call to s.Write() + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(2) err := s.Write(zerolog.Nop(), "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", @@ -52,8 +50,10 @@ func TestLogWriting(t *testing.T) { } func TestLogRotation(t *testing.T) { - s := tempStorage() - defer os.RemoveAll(s.BasePath) + s, finish, mocks := taskLogsTestFixtures(t) + defer finish() + + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()) err := s.Write(zerolog.Nop(), "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", @@ -81,12 +81,15 @@ func TestLogRotation(t *testing.T) { } func TestLogTail(t *testing.T) { - s := tempStorage() - defer os.RemoveAll(s.BasePath) + s, finish, mocks := taskLogsTestFixtures(t) + defer finish() jobID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c" taskID := "20ff9d06-53ec-4019-9e2e-1774f05f170a" + // Expect broadcastst for each call to s.Write() + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(3) + contents, err := s.Tail(jobID, taskID) assert.ErrorIs(t, err, os.ErrNotExist) assert.Equal(t, "", contents) @@ -142,9 +145,8 @@ func TestLogTail(t *testing.T) { } func TestLogWritingParallel(t *testing.T) { - s := tempStorage() - defer os.RemoveAll(s.BasePath) - // defer t.Errorf("not removing %s", s.BasePath) + s, finish, mocks := taskLogsTestFixtures(t) + defer finish() numGoroutines := 1000 // How many goroutines run in parallel. runLength := 100 // How many characters are logged, per goroutine. @@ -154,6 +156,8 @@ func TestLogWritingParallel(t *testing.T) { jobID := "6d9a05a1-261e-4f6f-93b0-8c4f6b6d500d" taskID := "d19888cc-c389-4a24-aebf-8458ababdb02" + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numGoroutines) + for i := 0; i < numGoroutines; i++ { // Write lines of 100 characters to the task log. Each goroutine writes a // different character, starting at 'A'. @@ -188,3 +192,30 @@ func TestLogWritingParallel(t *testing.T) { } } } + +type TaskLogsMocks struct { + clock *clock.Mock + broadcaster *mocks.MockChangeBroadcaster +} + +func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) { + mockCtrl := gomock.NewController(t) + + mocks := &TaskLogsMocks{ + broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), + } + + temppath, err := ioutil.TempDir("", "testlogs") + if err != nil { + panic(err) + } + + // This should be called at the end of each unit test. + finish := func() { + os.RemoveAll(temppath) + mockCtrl.Finish() + } + + sm := NewStorage(temppath, mocks.broadcaster) + return sm, finish, mocks +}