Unify task log storage & manager-local storage

The task logs storage system is refactored to use the `local_storage`
package. Configuration options have also changed:

- `task_logs_path` is renamed to `local_manager_storage_path`, to
  emphasise that only the Manager deals with those files, with default
  value `./flamenco-manager-storage`.
- `storage_path` is renamed to `shared_storage_path`, to emphasise this
  is the storage shared between Manager and Workers, with default value
  `./flamenco-shared-storage`.

Task logs are still stored in
`${local_manager_storage_path}/job-{jobUUID[0:4]}/{jobUUID}/task-{taskUUID}.txt`

Manifest task: T99409
This commit is contained in:
Sybren A. Stüvel 2022-07-05 17:58:58 +02:00
parent 9f9a278634
commit d4429d593c
8 changed files with 118 additions and 92 deletions

View File

@ -115,10 +115,8 @@ func main() {
timeService := clock.New()
webUpdater := webupdates.New()
// TODO: the local storage now is hard-coded to use the same sub-directory as the task log storage.
// This should be refactored so that the task logs storage uses the localStorage object as well.
localStorage := local_storage.NewNextToExe("task-logs")
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater)
localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath)
logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
lastRender := last_rendered.New(localStorage)

View File

@ -102,7 +102,7 @@ func TestReplaceJobsVariable(t *testing.T) {
// Having the Shaman enabled should create an implicit variable "{jobs}".
conf := config.GetTestConfig(func(c *config.Conf) {
c.StoragePath = "/path/to/flamenco-storage"
c.SharedStoragePath = "/path/to/flamenco-storage"
c.Shaman.Enabled = true
})

View File

@ -66,16 +66,21 @@ type ConfMeta struct {
type Base struct {
Meta ConfMeta `yaml:"_meta"`
ManagerName string `yaml:"manager_name"`
DatabaseDSN string `yaml:"database"`
TaskLogsPath string `yaml:"task_logs_path"`
Listen string `yaml:"listen"`
ManagerName string `yaml:"manager_name"`
DatabaseDSN string `yaml:"database"`
Listen string `yaml:"listen"`
SSDPDiscovery bool `yaml:"autodiscoverable"`
// Storage configuration:
StoragePath string `yaml:"storage_path"`
Shaman shaman_config.Config `yaml:"shaman"`
// LocalManagerStoragePath is where the Manager stores its files, like task
// logs, last-rendered images, etc.
LocalManagerStoragePath string `yaml:"local_manager_storage_path"`
// SharedStoragePath is where files shared between Manager and Workers go,
// like the blend files of a render job.
SharedStoragePath string `yaml:"shared_storage_path"`
Shaman shaman_config.Config `yaml:"shaman"`
TaskTimeout time.Duration `yaml:"task_timeout"`
WorkerTimeout time.Duration `yaml:"worker_timeout"`
@ -213,18 +218,18 @@ func (c *Conf) processAfterLoading(override ...func(c *Conf)) {
}
func (c *Conf) processStorage() {
storagePath, err := filepath.Abs(c.StoragePath)
storagePath, err := filepath.Abs(c.SharedStoragePath)
if err != nil {
log.Error().Err(err).
Str("storage_path", c.StoragePath).
Str("storage_path", c.SharedStoragePath).
Msg("unable to determine absolute storage path")
} else {
c.StoragePath = storagePath
c.SharedStoragePath = storagePath
}
// Shaman should use the Flamenco storage location.
if c.Shaman.Enabled {
c.Shaman.StoragePath = c.StoragePath
c.Shaman.StoragePath = c.SharedStoragePath
}
}
@ -236,7 +241,7 @@ func (c *Conf) EffectiveStoragePath() string {
if c.Shaman.Enabled {
jobStorage = c.Shaman.CheckoutPath()
} else {
jobStorage = c.StoragePath
jobStorage = c.SharedStoragePath
}
absPath, err := filepath.Abs(jobStorage)

View File

@ -17,10 +17,10 @@ var defaultConfig = Conf{
ManagerName: "Flamenco Manager",
Listen: ":8080",
// ListenHTTPS: ":8433",
DatabaseDSN: "flamenco-manager.sqlite",
TaskLogsPath: "./task-logs",
SSDPDiscovery: true,
StoragePath: "./flamenco-storage",
DatabaseDSN: "flamenco-manager.sqlite",
SSDPDiscovery: true,
LocalManagerStoragePath: "./flamenco-manager-storage",
SharedStoragePath: "./flamenco-shared-storage",
Shaman: shaman_config.Config{
// Enable Shaman by default, except on Windows where symlinks are still tricky.

View File

@ -14,7 +14,7 @@ func TestDefaultSettings(t *testing.T) {
// The settings should contain the defaults, though.
assert.Equal(t, latestConfigVersion, config.Meta.Version)
assert.Equal(t, defaultConfig.TaskLogsPath, config.TaskLogsPath)
assert.Equal(t, defaultConfig.LocalManagerStoragePath, config.LocalManagerStoragePath)
assert.Equal(t, defaultConfig.DatabaseDSN, config.DatabaseDSN)
assert.Equal(t, false, config.Variables["ffmpeg"].IsTwoWay)
@ -46,7 +46,7 @@ func TestVariableValidation(t *testing.T) {
func TestStorageImplicitVariablesWithShaman(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
// Having the Shaman enabled should create an implicit variable "{jobs}" at the Shaman checkout path.
c.StoragePath = "/path/to/shaman/storage"
c.SharedStoragePath = "/path/to/shaman/storage"
c.Shaman.Enabled = true
c.Variables["jobs"] = Variable{
@ -72,7 +72,7 @@ func TestStorageImplicitVariablesWithShaman(t *testing.T) {
func TestStorageImplicitVariablesWithoutShaman(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
// Having the Shaman disabled should create an implicit variable "{jobs}" at the storage path.
c.StoragePath = "/path/to/shaman/storage"
c.SharedStoragePath = "/path/to/shaman/storage"
c.Shaman.Enabled = false
c.Variables["jobs"] = Variable{
@ -92,5 +92,5 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) {
t.FailNow()
}
assert.False(t, c.implicitVariables["jobs"].IsTwoWay)
assert.Equal(t, c.StoragePath, c.implicitVariables["jobs"].Values[0].Value)
assert.Equal(t, c.SharedStoragePath, c.implicitVariables["jobs"].Values[0].Value)
}

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: ChangeBroadcaster)
// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: LocalStorage,ChangeBroadcaster)
// Package mocks is a generated GoMock package.
package mocks
@ -11,6 +11,43 @@ import (
gomock "github.com/golang/mock/gomock"
)
// MockLocalStorage is a mock of LocalStorage interface.
type MockLocalStorage struct {
ctrl *gomock.Controller
recorder *MockLocalStorageMockRecorder
}
// MockLocalStorageMockRecorder is the mock recorder for MockLocalStorage.
type MockLocalStorageMockRecorder struct {
mock *MockLocalStorage
}
// NewMockLocalStorage creates a new mock instance.
func NewMockLocalStorage(ctrl *gomock.Controller) *MockLocalStorage {
mock := &MockLocalStorage{ctrl: ctrl}
mock.recorder = &MockLocalStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLocalStorage) EXPECT() *MockLocalStorageMockRecorder {
return m.recorder
}
// ForJob mocks base method.
func (m *MockLocalStorage) ForJob(arg0 string) string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ForJob", arg0)
ret0, _ := ret[0].(string)
return ret0
}
// ForJob indicates an expected call of ForJob.
func (mr *MockLocalStorageMockRecorder) ForJob(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForJob", reflect.TypeOf((*MockLocalStorage)(nil).ForJob), arg0)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller

View File

@ -8,7 +8,6 @@ import (
"io"
"os"
"path"
"path/filepath"
"sync"
"time"
@ -16,7 +15,6 @@ import (
"git.blender.org/flamenco/pkg/api"
"github.com/benbjohnson/clock"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
const (
@ -26,7 +24,7 @@ const (
// Storage can write data to task logs, rotate logs, etc.
type Storage struct {
BasePath string // Directory where task logs are stored.
localStorage LocalStorage
clock clock.Clock
broadcaster ChangeBroadcaster
@ -37,7 +35,12 @@ type Storage struct {
}
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs ChangeBroadcaster
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs LocalStorage,ChangeBroadcaster
type LocalStorage interface {
// ForJob returns the absolute directory path for storing job-related files.
ForJob(jobUUID string) string
}
type ChangeBroadcaster interface {
// BroadcastTaskLogUpdate sends the task log update to SocketIO clients.
@ -49,28 +52,16 @@ var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// NewStorage creates a new log storage rooted at `basePath`.
func NewStorage(
basePath string,
localStorage LocalStorage,
clock clock.Clock,
broadcaster ChangeBroadcaster,
) *Storage {
if !filepath.IsAbs(basePath) {
absPath, err := filepath.Abs(basePath)
if err != nil {
log.Panic().Err(err).Str("path", basePath).Msg("cannot resolve relative path to task logs")
}
basePath = absPath
}
log.Info().
Str("path", basePath).
Msg("task logs")
return &Storage{
BasePath: basePath,
clock: clock,
broadcaster: broadcaster,
mutex: new(sync.Mutex),
taskLocks: make(map[string]*sync.Mutex),
localStorage: localStorage,
clock: clock,
broadcaster: broadcaster,
mutex: new(sync.Mutex),
taskLocks: make(map[string]*sync.Mutex),
}
}
@ -163,12 +154,7 @@ func (s *Storage) RotateFile(logger zerolog.Logger, jobID, taskID string) {
// file handling code in this source file is migrated to use the `local_storage`
// package at some point.
func (s *Storage) filepath(jobID, taskID string) string {
var dirpath string
if jobID == "" {
dirpath = path.Join(s.BasePath, "jobless")
} else {
dirpath = path.Join(s.BasePath, "job-"+jobID[:4], jobID)
}
dirpath := s.localStorage.ForJob(jobID)
filename := fmt.Sprintf("task-%v.txt", taskID)
return path.Join(dirpath, filename)
}

View File

@ -26,27 +26,22 @@ func TestLogWriting(t *testing.T) {
s, finish, mocks := taskLogsTestFixtures(t)
defer finish()
jobUUID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c"
taskUUID := "20ff9d06-53ec-4019-9e2e-1774f05f170a"
jobDir := filepath.Join(mocks.temppath, "job-25c5", jobUUID)
// Expect broadcastst for each call to s.Write()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(2)
numWriteCalls := 2
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numWriteCalls)
mocks.localStorage.EXPECT().ForJob(jobUUID).Times(numWriteCalls).Return(jobDir)
err := s.Write(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"20ff9d06-53ec-4019-9e2e-1774f05f170a",
"Ovo je priča")
err := s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ovo je priča")
assert.NoError(t, err)
err = s.Write(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"20ff9d06-53ec-4019-9e2e-1774f05f170a",
"Ima dvije linije")
err = s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ima dvije linije")
assert.NoError(t, err)
filename := filepath.Join(
s.BasePath,
"job-25c5",
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt")
filename := filepath.Join(jobDir, "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt")
contents, err := ioutil.ReadFile(filename)
assert.NoError(t, err, "the log file should exist")
assert.Equal(t, "Ovo je priča\nIma dvije linije\n", string(contents))
@ -56,23 +51,19 @@ func TestLogRotation(t *testing.T) {
s, finish, mocks := taskLogsTestFixtures(t)
defer finish()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any())
jobUUID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c"
taskUUID := "20ff9d06-53ec-4019-9e2e-1774f05f170a"
jobDir := filepath.Join(mocks.temppath, "job-25c5", jobUUID)
err := s.Write(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"20ff9d06-53ec-4019-9e2e-1774f05f170a",
"Ovo je priča")
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any())
mocks.localStorage.EXPECT().ForJob(jobUUID).Return(jobDir).AnyTimes()
err := s.Write(zerolog.Nop(), jobUUID, taskUUID, "Ovo je priča")
assert.NoError(t, err)
s.RotateFile(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"20ff9d06-53ec-4019-9e2e-1774f05f170a")
s.RotateFile(zerolog.Nop(), jobUUID, taskUUID)
filename := filepath.Join(
s.BasePath,
"job-25c5",
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
"task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt")
filename := filepath.Join(jobDir, "task-20ff9d06-53ec-4019-9e2e-1774f05f170a.txt")
rotatedFilename := filename + ".1"
contents, err := ioutil.ReadFile(rotatedFilename)
@ -89,9 +80,11 @@ func TestLogTail(t *testing.T) {
jobID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c"
taskID := "20ff9d06-53ec-4019-9e2e-1774f05f170a"
jobDir := filepath.Join(mocks.temppath, "job-25c5", jobID)
// Expect broadcastst for each call to s.Write()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(3)
mocks.localStorage.EXPECT().ForJob(jobID).Return(jobDir).AnyTimes()
contents, err := s.Tail(jobID, taskID)
assert.ErrorIs(t, err, os.ErrNotExist)
@ -158,8 +151,10 @@ func TestLogWritingParallel(t *testing.T) {
jobID := "6d9a05a1-261e-4f6f-93b0-8c4f6b6d500d"
taskID := "d19888cc-c389-4a24-aebf-8458ababdb02"
jobDir := filepath.Join(mocks.temppath, "job-25c5", jobID)
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numGoroutines)
mocks.localStorage.EXPECT().ForJob(jobID).Return(jobDir).AnyTimes()
for i := 0; i < numGoroutines; i++ {
// Write lines of 100 characters to the task log. Each goroutine writes a
@ -197,16 +192,26 @@ func TestLogWritingParallel(t *testing.T) {
}
type TaskLogsMocks struct {
clock *clock.Mock
broadcaster *mocks.MockChangeBroadcaster
temppath string
localStorage *mocks.MockLocalStorage
clock *clock.Mock
broadcaster *mocks.MockChangeBroadcaster
}
func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) {
mockCtrl := gomock.NewController(t)
temppath, err := ioutil.TempDir("", "testlogs")
if err != nil {
panic(err)
}
mocks := &TaskLogsMocks{
clock: clock.NewMock(),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
temppath: temppath,
localStorage: mocks.NewMockLocalStorage(mockCtrl),
clock: clock.NewMock(),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
}
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T16:52:04+02:00")
@ -215,17 +220,12 @@ func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) {
}
mocks.clock.Set(mockedNow)
temppath, err := ioutil.TempDir("", "testlogs")
if err != nil {
panic(err)
}
// This should be called at the end of each unit test.
finish := func() {
os.RemoveAll(temppath)
mockCtrl.Finish()
}
sm := NewStorage(temppath, mocks.clock, mocks.broadcaster)
sm := NewStorage(mocks.localStorage, mocks.clock, mocks.broadcaster)
return sm, finish, mocks
}