From d4429d593cfce9f7d0a668f98d8e85477a83642f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 5 Jul 2022 17:58:58 +0200 Subject: [PATCH] Unify task log storage & manager-local storage The task logs storage system is refactored to use the `local_storage` package. Configuration options have also changed: - `task_logs_path` is renamed to `local_manager_storage_path`, to emphasise that only the Manager deals with those files, with default value `./flamenco-manager-storage`. - `storage_path` is renamed to `shared_storage_path`, to emphasise this is the storage shared between Manager and Workers, with default value `./flamenco-shared-storage`. Task logs are still stored in `${local_manager_storage_path}/job-{jobUUID[0:4]}/{jobUUID}/task-{taskUUID}.txt` Manifest task: T99409 --- cmd/flamenco-manager/main.go | 6 +- internal/manager/api_impl/varrepl_test.go | 2 +- internal/manager/config/config.go | 29 ++++--- internal/manager/config/defaults.go | 8 +- internal/manager/config/settings_test.go | 8 +- .../task_logs/mocks/interfaces_mock.gen.go | 39 +++++++++- internal/manager/task_logs/task_logs.go | 42 ++++------ internal/manager/task_logs/task_logs_test.go | 76 +++++++++---------- 8 files changed, 118 insertions(+), 92 deletions(-) diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 9fd0c843..406af0fe 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -115,10 +115,8 @@ func main() { timeService := clock.New() webUpdater := webupdates.New() - // TODO: the local storage now is hard-coded to use the same sub-directory as the task log storage. - // This should be refactored so that the task logs storage uses the localStorage object as well. - localStorage := local_storage.NewNextToExe("task-logs") - logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) + localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath) + logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) lastRender := last_rendered.New(localStorage) diff --git a/internal/manager/api_impl/varrepl_test.go b/internal/manager/api_impl/varrepl_test.go index de779f63..0f86dcac 100644 --- a/internal/manager/api_impl/varrepl_test.go +++ b/internal/manager/api_impl/varrepl_test.go @@ -102,7 +102,7 @@ func TestReplaceJobsVariable(t *testing.T) { // Having the Shaman enabled should create an implicit variable "{jobs}". conf := config.GetTestConfig(func(c *config.Conf) { - c.StoragePath = "/path/to/flamenco-storage" + c.SharedStoragePath = "/path/to/flamenco-storage" c.Shaman.Enabled = true }) diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index 22eaa92f..e50bbb02 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -66,16 +66,21 @@ type ConfMeta struct { type Base struct { Meta ConfMeta `yaml:"_meta"` - ManagerName string `yaml:"manager_name"` - DatabaseDSN string `yaml:"database"` - TaskLogsPath string `yaml:"task_logs_path"` - Listen string `yaml:"listen"` + ManagerName string `yaml:"manager_name"` + DatabaseDSN string `yaml:"database"` + Listen string `yaml:"listen"` SSDPDiscovery bool `yaml:"autodiscoverable"` - // Storage configuration: - StoragePath string `yaml:"storage_path"` - Shaman shaman_config.Config `yaml:"shaman"` + // LocalManagerStoragePath is where the Manager stores its files, like task + // logs, last-rendered images, etc. + LocalManagerStoragePath string `yaml:"local_manager_storage_path"` + + // SharedStoragePath is where files shared between Manager and Workers go, + // like the blend files of a render job. + SharedStoragePath string `yaml:"shared_storage_path"` + + Shaman shaman_config.Config `yaml:"shaman"` TaskTimeout time.Duration `yaml:"task_timeout"` WorkerTimeout time.Duration `yaml:"worker_timeout"` @@ -213,18 +218,18 @@ func (c *Conf) processAfterLoading(override ...func(c *Conf)) { } func (c *Conf) processStorage() { - storagePath, err := filepath.Abs(c.StoragePath) + storagePath, err := filepath.Abs(c.SharedStoragePath) if err != nil { log.Error().Err(err). - Str("storage_path", c.StoragePath). + Str("storage_path", c.SharedStoragePath). Msg("unable to determine absolute storage path") } else { - c.StoragePath = storagePath + c.SharedStoragePath = storagePath } // Shaman should use the Flamenco storage location. if c.Shaman.Enabled { - c.Shaman.StoragePath = c.StoragePath + c.Shaman.StoragePath = c.SharedStoragePath } } @@ -236,7 +241,7 @@ func (c *Conf) EffectiveStoragePath() string { if c.Shaman.Enabled { jobStorage = c.Shaman.CheckoutPath() } else { - jobStorage = c.StoragePath + jobStorage = c.SharedStoragePath } absPath, err := filepath.Abs(jobStorage) diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index 176b2d4f..af2d561f 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -17,10 +17,10 @@ var defaultConfig = Conf{ ManagerName: "Flamenco Manager", Listen: ":8080", // ListenHTTPS: ":8433", - DatabaseDSN: "flamenco-manager.sqlite", - TaskLogsPath: "./task-logs", - SSDPDiscovery: true, - StoragePath: "./flamenco-storage", + DatabaseDSN: "flamenco-manager.sqlite", + SSDPDiscovery: true, + LocalManagerStoragePath: "./flamenco-manager-storage", + SharedStoragePath: "./flamenco-shared-storage", Shaman: shaman_config.Config{ // Enable Shaman by default, except on Windows where symlinks are still tricky. diff --git a/internal/manager/config/settings_test.go b/internal/manager/config/settings_test.go index 50eee65a..4920ce33 100644 --- a/internal/manager/config/settings_test.go +++ b/internal/manager/config/settings_test.go @@ -14,7 +14,7 @@ func TestDefaultSettings(t *testing.T) { // The settings should contain the defaults, though. assert.Equal(t, latestConfigVersion, config.Meta.Version) - assert.Equal(t, defaultConfig.TaskLogsPath, config.TaskLogsPath) + assert.Equal(t, defaultConfig.LocalManagerStoragePath, config.LocalManagerStoragePath) assert.Equal(t, defaultConfig.DatabaseDSN, config.DatabaseDSN) assert.Equal(t, false, config.Variables["ffmpeg"].IsTwoWay) @@ -46,7 +46,7 @@ func TestVariableValidation(t *testing.T) { func TestStorageImplicitVariablesWithShaman(t *testing.T) { c := DefaultConfig(func(c *Conf) { // Having the Shaman enabled should create an implicit variable "{jobs}" at the Shaman checkout path. - c.StoragePath = "/path/to/shaman/storage" + c.SharedStoragePath = "/path/to/shaman/storage" c.Shaman.Enabled = true c.Variables["jobs"] = Variable{ @@ -72,7 +72,7 @@ func TestStorageImplicitVariablesWithShaman(t *testing.T) { func TestStorageImplicitVariablesWithoutShaman(t *testing.T) { c := DefaultConfig(func(c *Conf) { // Having the Shaman disabled should create an implicit variable "{jobs}" at the storage path. - c.StoragePath = "/path/to/shaman/storage" + c.SharedStoragePath = "/path/to/shaman/storage" c.Shaman.Enabled = false c.Variables["jobs"] = Variable{ @@ -92,5 +92,5 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) { t.FailNow() } assert.False(t, c.implicitVariables["jobs"].IsTwoWay) - assert.Equal(t, c.StoragePath, c.implicitVariables["jobs"].Values[0].Value) + assert.Equal(t, c.SharedStoragePath, c.implicitVariables["jobs"].Values[0].Value) } diff --git a/internal/manager/task_logs/mocks/interfaces_mock.gen.go b/internal/manager/task_logs/mocks/interfaces_mock.gen.go index 51308612..28658c0e 100644 --- a/internal/manager/task_logs/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_logs/mocks/interfaces_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: ChangeBroadcaster) +// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: LocalStorage,ChangeBroadcaster) // Package mocks is a generated GoMock package. package mocks @@ -11,6 +11,43 @@ import ( gomock "github.com/golang/mock/gomock" ) +// MockLocalStorage is a mock of LocalStorage interface. +type MockLocalStorage struct { + ctrl *gomock.Controller + recorder *MockLocalStorageMockRecorder +} + +// MockLocalStorageMockRecorder is the mock recorder for MockLocalStorage. +type MockLocalStorageMockRecorder struct { + mock *MockLocalStorage +} + +// NewMockLocalStorage creates a new mock instance. +func NewMockLocalStorage(ctrl *gomock.Controller) *MockLocalStorage { + mock := &MockLocalStorage{ctrl: ctrl} + mock.recorder = &MockLocalStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLocalStorage) EXPECT() *MockLocalStorageMockRecorder { + return m.recorder +} + +// ForJob mocks base method. +func (m *MockLocalStorage) ForJob(arg0 string) string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForJob", arg0) + ret0, _ := ret[0].(string) + return ret0 +} + +// ForJob indicates an expected call of ForJob. +func (mr *MockLocalStorageMockRecorder) ForJob(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForJob", reflect.TypeOf((*MockLocalStorage)(nil).ForJob), arg0) +} + // MockChangeBroadcaster is a mock of ChangeBroadcaster interface. type MockChangeBroadcaster struct { ctrl *gomock.Controller diff --git a/internal/manager/task_logs/task_logs.go b/internal/manager/task_logs/task_logs.go index 2422acfd..561be1ef 100644 --- a/internal/manager/task_logs/task_logs.go +++ b/internal/manager/task_logs/task_logs.go @@ -8,7 +8,6 @@ import ( "io" "os" "path" - "path/filepath" "sync" "time" @@ -16,7 +15,6 @@ import ( "git.blender.org/flamenco/pkg/api" "github.com/benbjohnson/clock" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) const ( @@ -26,7 +24,7 @@ const ( // Storage can write data to task logs, rotate logs, etc. type Storage struct { - BasePath string // Directory where task logs are stored. + localStorage LocalStorage clock clock.Clock broadcaster ChangeBroadcaster @@ -37,7 +35,12 @@ type Storage struct { } // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs ChangeBroadcaster +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs LocalStorage,ChangeBroadcaster + +type LocalStorage interface { + // ForJob returns the absolute directory path for storing job-related files. + ForJob(jobUUID string) string +} type ChangeBroadcaster interface { // BroadcastTaskLogUpdate sends the task log update to SocketIO clients. @@ -49,28 +52,16 @@ var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) // NewStorage creates a new log storage rooted at `basePath`. func NewStorage( - basePath string, + localStorage LocalStorage, clock clock.Clock, broadcaster ChangeBroadcaster, ) *Storage { - if !filepath.IsAbs(basePath) { - absPath, err := filepath.Abs(basePath) - if err != nil { - log.Panic().Err(err).Str("path", basePath).Msg("cannot resolve relative path to task logs") - } - basePath = absPath - } - - log.Info(). - Str("path", basePath). - Msg("task logs") - return &Storage{ - BasePath: basePath, - clock: clock, - broadcaster: broadcaster, - mutex: new(sync.Mutex), - taskLocks: make(map[string]*sync.Mutex), + localStorage: localStorage, + clock: clock, + broadcaster: broadcaster, + mutex: new(sync.Mutex), + taskLocks: make(map[string]*sync.Mutex), } } @@ -163,12 +154,7 @@ func (s *Storage) RotateFile(logger zerolog.Logger, jobID, taskID string) { // file handling code in this source file is migrated to use the `local_storage` // package at some point. func (s *Storage) filepath(jobID, taskID string) string { - var dirpath string - if jobID == "" { - dirpath = path.Join(s.BasePath, "jobless") - } else { - dirpath = path.Join(s.BasePath, "job-"+jobID[:4], jobID) - } + dirpath := s.localStorage.ForJob(jobID) filename := fmt.Sprintf("task-%v.txt", taskID) return path.Join(dirpath, filename) } diff --git a/internal/manager/task_logs/task_logs_test.go b/internal/manager/task_logs/task_logs_test.go index e7a5ae23..3cc0a203 100644 --- a/internal/manager/task_logs/task_logs_test.go +++ b/internal/manager/task_logs/task_logs_test.go @@ -26,27 +26,22 @@ func TestLogWriting(t *testing.T) { s, finish, mocks := taskLogsTestFixtures(t) defer finish() + jobUUID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c" + taskUUID := "20ff9d06-53ec-4019-9e2e-1774f05f170a" + jobDir := filepath.Join(mocks.temppath, "job-25c5", jobUUID) + // Expect broadcastst for each call to s.Write() - mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(2) + numWriteCalls := 2 + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numWriteCalls) + mocks.localStorage.EXPECT().ForJob(jobUUID).Times(numWriteCalls).Return(jobDir) - err := s.Write(zerolog.Nop(), - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "20ff9d06-53ec-4019-9e2e-1774f05f170a", - "Ovo je priča") + err := s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ovo je priča") assert.NoError(t, err) - err = s.Write(zerolog.Nop(), - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "20ff9d06-53ec-4019-9e2e-1774f05f170a", - "Ima dvije linije") + err = s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ima dvije linije") assert.NoError(t, err) - filename := filepath.Join( - s.BasePath, - "job-25c5", - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt") - + filename := filepath.Join(jobDir, "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt") contents, err := ioutil.ReadFile(filename) assert.NoError(t, err, "the log file should exist") assert.Equal(t, "Ovo je priča\nIma dvije linije\n", string(contents)) @@ -56,23 +51,19 @@ func TestLogRotation(t *testing.T) { s, finish, mocks := taskLogsTestFixtures(t) defer finish() - mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()) + jobUUID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c" + taskUUID := "20ff9d06-53ec-4019-9e2e-1774f05f170a" + jobDir := filepath.Join(mocks.temppath, "job-25c5", jobUUID) - err := s.Write(zerolog.Nop(), - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "20ff9d06-53ec-4019-9e2e-1774f05f170a", - "Ovo je priča") + mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()) + mocks.localStorage.EXPECT().ForJob(jobUUID).Return(jobDir).AnyTimes() + + err := s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ovo je priča") assert.NoError(t, err) - s.RotateFile(zerolog.Nop(), - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "20ff9d06-53ec-4019-9e2e-1774f05f170a") + s.RotateFile(zerolog.Nop(), jobUUID, taskUUID) - filename := filepath.Join( - s.BasePath, - "job-25c5", - "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", - "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt") + filename := filepath.Join(jobDir, "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt") rotatedFilename := filename + ".1" contents, err := ioutil.ReadFile(rotatedFilename) @@ -89,9 +80,11 @@ func TestLogTail(t *testing.T) { jobID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c" taskID := "20ff9d06-53ec-4019-9e2e-1774f05f170a" + jobDir := filepath.Join(mocks.temppath, "job-25c5", jobID) // Expect broadcastst for each call to s.Write() mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(3) + mocks.localStorage.EXPECT().ForJob(jobID).Return(jobDir).AnyTimes() contents, err := s.Tail(jobID, taskID) assert.ErrorIs(t, err, os.ErrNotExist) @@ -158,8 +151,10 @@ func TestLogWritingParallel(t *testing.T) { jobID := "6d9a05a1-261e-4f6f-93b0-8c4f6b6d500d" taskID := "d19888cc-c389-4a24-aebf-8458ababdb02" + jobDir := filepath.Join(mocks.temppath, "job-25c5", jobID) mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numGoroutines) + mocks.localStorage.EXPECT().ForJob(jobID).Return(jobDir).AnyTimes() for i := 0; i < numGoroutines; i++ { // Write lines of 100 characters to the task log. Each goroutine writes a @@ -197,16 +192,26 @@ func TestLogWritingParallel(t *testing.T) { } type TaskLogsMocks struct { - clock *clock.Mock - broadcaster *mocks.MockChangeBroadcaster + temppath string + + localStorage *mocks.MockLocalStorage + clock *clock.Mock + broadcaster *mocks.MockChangeBroadcaster } func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) { mockCtrl := gomock.NewController(t) + temppath, err := ioutil.TempDir("", "testlogs") + if err != nil { + panic(err) + } + mocks := &TaskLogsMocks{ - clock: clock.NewMock(), - broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), + temppath: temppath, + localStorage: mocks.NewMockLocalStorage(mockCtrl), + clock: clock.NewMock(), + broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), } mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T16:52:04+02:00") @@ -215,17 +220,12 @@ func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) { } mocks.clock.Set(mockedNow) - temppath, err := ioutil.TempDir("", "testlogs") - if err != nil { - panic(err) - } - // This should be called at the end of each unit test. finish := func() { os.RemoveAll(temppath) mockCtrl.Finish() } - sm := NewStorage(temppath, mocks.clock, mocks.broadcaster) + sm := NewStorage(mocks.localStorage, mocks.clock, mocks.broadcaster) return sm, finish, mocks }