From 24204084c1c995ad9d4ff8b78eb0a637db10e212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 9 Jun 2022 17:00:38 +0200 Subject: [PATCH] Manager: move timestamping of log messages to `task_logs` package In the future different services will write to the task log, and thus it makes sense to move the responsibility of prepending the timestamps to the log storage service. --- cmd/flamenco-manager/main.go | 2 +- internal/manager/api_impl/interfaces.go | 1 + .../manager/api_impl/mocks/api_impl_mock.gen.go | 14 ++++++++++++++ internal/manager/api_impl/workers.go | 15 ++++++--------- internal/manager/api_impl/workers_test.go | 10 +++++----- internal/manager/task_logs/task_logs.go | 17 +++++++++++------ internal/manager/task_logs/task_logs_test.go | 10 +++++++++- 7 files changed, 47 insertions(+), 22 deletions(-) diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 2f528103..231229f1 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -159,7 +159,7 @@ func buildFlamencoAPI( if err != nil { log.Fatal().Err(err).Msg("error loading job compilers") } - logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, webUpdater) + logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) shamanServer := shaman.NewServer(configService.Get().Shaman, nil) flamenco := api_impl.NewFlamenco( compiler, persist, webUpdater, logStorage, configService, diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index fd6758c0..827ecb97 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -92,6 +92,7 @@ type JobCompiler interface { // LogStorage handles incoming task logs. type LogStorage interface { Write(logger zerolog.Logger, jobID, taskID string, logText string) error + WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error RotateFile(logger zerolog.Logger, jobID, taskID string) Tail(jobID, taskID string) (string, error) } diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index bc603222..faad736d 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -448,6 +448,20 @@ func (mr *MockLogStorageMockRecorder) Write(arg0, arg1, arg2, arg3 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockLogStorage)(nil).Write), arg0, arg1, arg2, arg3) } +// WriteTimestamped mocks base method. +func (m *MockLogStorage) WriteTimestamped(arg0 zerolog.Logger, arg1, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteTimestamped", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteTimestamped indicates an expected call of WriteTimestamped. +func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3) +} + // MockConfigService is a mock of ConfigService interface. type MockConfigService struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index af9c9401..10f57337 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -211,7 +211,8 @@ func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog. lastErr = err } - f.taskLogAppendTimestamped(logger, task, "Task was requeued by Manager because the worker assigned to it signed off.\n") + _ = f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, + "Task was requeued by Manager because the worker assigned to it signed off.") } return lastErr @@ -319,8 +320,10 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { } // Add a note to the task log about the worker assignment. - f.taskLogAppendTimestamped(logger, dbTask, - fmt.Sprintf("Task assigned to worker %s (%s)\n", worker.Name, worker.UUID)) + msg := fmt.Sprintf("Task assigned to worker %s (%s)", worker.Name, worker.UUID) + if err := f.logStorage.WriteTimestamped(logger, dbTask.Job.UUID, dbTask.UUID, msg); err != nil { + return sendAPIError(e, http.StatusInternalServerError, "internal error appending to task log: %v", err) + } // Start timeout measurement as soon as the Worker gets the task assigned. if err := f.workerPingedTask(e.Request().Context(), logger, dbTask); err != nil { @@ -466,12 +469,6 @@ func (f *Flamenco) workerPingedTask( return nil } -// taskLogAppendTimestamped writes the given log text, prefixed with the current date & time, to the task's log. -func (f *Flamenco) taskLogAppendTimestamped(logger zerolog.Logger, dbTask *persistence.Task, logText string) { - now := f.clock.Now().Format(time.RFC3339) - _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, now+" "+logText) -} - func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 8098ed42..4ec10d5f 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -36,8 +36,8 @@ func TestTaskScheduleHappy(t *testing.T) { mf.persistence.EXPECT().ScheduleTask(echo.Request().Context(), &worker).Return(&task, nil) mf.persistence.EXPECT().TaskTouchedByWorker(echo.Request().Context(), &task) - mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task.UUID, - "2022-06-09T11:14:41+02:00 Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)\n") + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID, + "Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)") err := mf.flamenco.ScheduleTask(echo) assert.NoError(t, err) @@ -167,9 +167,9 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { // Expect this re-queueing to end up in the task's log and activity. mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task1) // TODO: test saved activity value mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task2) // TODO: test saved activity value - logMsg := "2022-06-09T11:14:41+02:00 Task was requeued by Manager because the worker assigned to it signed off.\n" - mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg) - mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg) + logMsg := "Task was requeued by Manager because the worker assigned to it signed off." + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task1.UUID, logMsg) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task2.UUID, logMsg) // Expect worker to be saved as 'offline'. mf.persistence.EXPECT(). diff --git a/internal/manager/task_logs/task_logs.go b/internal/manager/task_logs/task_logs.go index bfd08f03..de828a49 100644 --- a/internal/manager/task_logs/task_logs.go +++ b/internal/manager/task_logs/task_logs.go @@ -10,9 +10,11 @@ import ( "path" "path/filepath" "sync" + "time" "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" + "github.com/benbjohnson/clock" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -26,6 +28,7 @@ const ( type Storage struct { BasePath string // Directory where task logs are stored. + clock clock.Clock broadcaster ChangeBroadcaster // Locks to only allow one goroutine at a time to handle the logs of a certain task. @@ -47,6 +50,7 @@ var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) // NewStorage creates a new log storage rooted at `basePath`. func NewStorage( basePath string, + clock clock.Clock, broadcaster ChangeBroadcaster, ) *Storage { if !filepath.IsAbs(basePath) { @@ -63,6 +67,7 @@ func NewStorage( return &Storage{ BasePath: basePath, + clock: clock, broadcaster: broadcaster, mutex: new(sync.Mutex), taskLocks: make(map[string]*sync.Mutex), @@ -81,12 +86,12 @@ func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText str return nil } -// // Write appends text, prefixed with the current date & time, to a task's log file, -// // and broadcasts the log lines via SocketIO. -// func (s *Storage) WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error { -// now := s.clock.Now().Format(time.RFC3339) -// return s.Write(logger, jobID, taskID, now+" "+logText) -// } +// Write appends text, prefixed with the current date & time, to a task's log file, +// and broadcasts the log lines via SocketIO. +func (s *Storage) WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error { + now := s.clock.Now().Format(time.RFC3339) + return s.Write(logger, jobID, taskID, now+" "+logText) +} func (s *Storage) writeToDisk(logger zerolog.Logger, jobID, taskID string, logText string) error { // Shortcut to avoid creating an empty log file. It also solves an diff --git a/internal/manager/task_logs/task_logs_test.go b/internal/manager/task_logs/task_logs_test.go index a70e28f2..a904937a 100644 --- a/internal/manager/task_logs/task_logs_test.go +++ b/internal/manager/task_logs/task_logs_test.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "testing" + "time" "git.blender.org/flamenco/internal/manager/task_logs/mocks" "github.com/benbjohnson/clock" @@ -202,9 +203,16 @@ func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) { mockCtrl := gomock.NewController(t) mocks := &TaskLogsMocks{ + clock: clock.NewMock(), broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), } + mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T16:52:04+02:00") + if err != nil { + panic(err) + } + mocks.clock.Set(mockedNow) + temppath, err := ioutil.TempDir("", "testlogs") if err != nil { panic(err) @@ -216,6 +224,6 @@ func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) { mockCtrl.Finish() } - sm := NewStorage(temppath, mocks.broadcaster) + sm := NewStorage(temppath, mocks.clock, mocks.broadcaster) return sm, finish, mocks }