Manager: implement FetchJobLastRenderedInfo() API operation

Allow querying for the URL & available versions of a job's last-rendered
image.
This commit is contained in:
Sybren A. Stüvel 2022-06-28 17:08:00 +02:00
parent 668e25fe95
commit 6efd67b05c
10 changed files with 191 additions and 35 deletions

View File

@ -122,8 +122,9 @@ func main() {
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
lastRender := last_rendered.New(localStorage) lastRender := last_rendered.New(localStorage)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater, lastRender) flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine,
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) logStorage, webUpdater, lastRender, localStorage)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage)
timeoutChecker := timeout_checker.New( timeoutChecker := timeout_checker.New(
configService.Get().TaskTimeout, configService.Get().TaskTimeout,
@ -189,6 +190,7 @@ func buildFlamencoAPI(
logStorage *task_logs.Storage, logStorage *task_logs.Storage,
webUpdater *webupdates.BiDirComms, webUpdater *webupdates.BiDirComms,
lastRender *last_rendered.LastRenderedProcessor, lastRender *last_rendered.LastRenderedProcessor,
localStorage local_storage.StorageInfo,
) api.ServerInterface { ) api.ServerInterface {
compiler, err := job_compilers.Load(timeService) compiler, err := job_compilers.Load(timeService)
if err != nil { if err != nil {
@ -197,7 +199,8 @@ func buildFlamencoAPI(
shamanServer := shaman.NewServer(configService.Get().Shaman, nil) shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco( flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService, compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService, lastRender) taskStateMachine, shamanServer, timeService, lastRender,
localStorage)
return flamenco return flamenco
} }
@ -207,6 +210,7 @@ func buildWebService(
ssdp *upnp_ssdp.Server, ssdp *upnp_ssdp.Server,
webUpdater *webupdates.BiDirComms, webUpdater *webupdates.BiDirComms,
ownURLs []url.URL, ownURLs []url.URL,
localStorage local_storage.StorageInfo,
) *echo.Echo { ) *echo.Echo {
e := echo.New() e := echo.New()
e.HideBanner = true e.HideBanner = true
@ -310,6 +314,13 @@ func buildWebService(
e.GET("/favicon.png", echo.WrapHandler(webAppHandler)) e.GET("/favicon.png", echo.WrapHandler(webAppHandler))
e.GET("/favicon.ico", echo.WrapHandler(webAppHandler)) e.GET("/favicon.ico", echo.WrapHandler(webAppHandler))
// Serve job-specific files (last-rendered image, task logs) directly from disk.
log.Info().
Str("onDisk", localStorage.Root()).
Str("url", api_impl.JobFilesURLPrefix).
Msg("serving job-specific files directly from disk")
e.Static(api_impl.JobFilesURLPrefix, localStorage.Root())
// Redirect / to the webapp. // Redirect / to the webapp.
e.GET("/", func(c echo.Context) error { e.GET("/", func(c echo.Context) error {
return c.Redirect(http.StatusTemporaryRedirect, "/app/") return c.Redirect(http.StatusTemporaryRedirect, "/app/")

View File

@ -24,6 +24,7 @@ type Flamenco struct {
shaman Shaman shaman Shaman
clock TimeService clock TimeService
lastRender LastRendered lastRender LastRendered
localStorage LocalStorage
// The task scheduler can be locked to prevent multiple Workers from getting // The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like // the same task. It is also used for certain other queries, like
@ -38,23 +39,25 @@ func NewFlamenco(
jc JobCompiler, jc JobCompiler,
jps PersistenceService, jps PersistenceService,
b ChangeBroadcaster, b ChangeBroadcaster,
ls LogStorage, logStorage LogStorage,
cs ConfigService, cs ConfigService,
sm TaskStateMachine, sm TaskStateMachine,
sha Shaman, sha Shaman,
ts TimeService, ts TimeService,
lr LastRendered, lr LastRendered,
localStorage LocalStorage,
) *Flamenco { ) *Flamenco {
return &Flamenco{ return &Flamenco{
jobCompiler: jc, jobCompiler: jc,
persist: jps, persist: jps,
broadcaster: b, broadcaster: b,
logStorage: ls, logStorage: logStorage,
config: cs, config: cs,
stateMachine: sm, stateMachine: sm,
shaman: sha, shaman: sha,
clock: ts, clock: ts,
lastRender: lr, lastRender: lr,
localStorage: localStorage,
} }
} }

View File

@ -24,7 +24,7 @@ import (
) )
// Generate mock implementations of these interfaces. // Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered //go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage
type PersistenceService interface { type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -124,6 +124,19 @@ type LastRendered interface {
// `last_rendered.ErrQueueFull` if there is no more space in the queue for // `last_rendered.ErrQueueFull` if there is no more space in the queue for
// new images. // new images.
QueueImage(payload last_rendered.Payload) error QueueImage(payload last_rendered.Payload) error
// PathForJob returns the base path for this job's last-rendered images.
PathForJob(jobUUID string) string
// ThumbSpecs returns the thumbnail specifications.
ThumbSpecs() []last_rendered.Thumbspec
}
// LocalStorage handles the storage organisation of local files like the last-rendered images.
type LocalStorage interface {
// RelPath tries to make the given path relative to the local storage root.
// Assumes `path` is already an absolute path.
RelPath(path string) (string, error)
} }
type ConfigService interface { type ConfigService interface {

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
"path"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
@ -17,6 +18,11 @@ import (
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
) )
// JobFilesURLPrefix is the URL prefix that the Flamenco API expects to serve
// the job-specific local files, i.e. the ones that are managed by
// `local_storage.StorageInfo`.
const JobFilesURLPrefix = "/job-files"
func (f *Flamenco) GetJobTypes(e echo.Context) error { func (f *Flamenco) GetJobTypes(e echo.Context) error {
logger := requestLogger(e) logger := requestLogger(e)
@ -305,6 +311,36 @@ func (f *Flamenco) RemoveJobBlocklist(e echo.Context, jobID string) error {
return e.NoContent(http.StatusNoContent) return e.NoContent(http.StatusNoContent)
} }
func (f *Flamenco) FetchJobLastRenderedInfo(e echo.Context, jobID string) error {
if !uuid.IsValid(jobID) {
return sendAPIError(e, http.StatusBadRequest, "job ID should be a UUID")
}
logger := requestLogger(e)
basePath := f.lastRender.PathForJob(jobID)
relPath, err := f.localStorage.RelPath(basePath)
if err != nil {
logger.Error().
Str("job", jobID).
Str("renderPath", basePath).
Err(err).
Msg("last-rendered path for this job is outside the local storage")
return sendAPIError(e, http.StatusInternalServerError, "error finding job storage path: %v", err)
}
suffixes := []string{}
for _, spec := range f.lastRender.ThumbSpecs() {
suffixes = append(suffixes, spec.Filename)
}
info := api.JobLastRenderedImageInfo{
Base: path.Join(JobFilesURLPrefix, relPath),
Suffixes: suffixes,
}
return e.JSON(http.StatusOK, info)
}
func jobDBtoAPI(dbJob *persistence.Job) api.Job { func jobDBtoAPI(dbJob *persistence.Job) api.Job {
apiJob := api.Job{ apiJob := api.Job{
SubmittedJob: api.SubmittedJob{ SubmittedJob: api.SubmittedJob{

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered) // Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage)
// Package mocks is a generated GoMock package. // Package mocks is a generated GoMock package.
package mocks package mocks
@ -855,6 +855,20 @@ func (m *MockLastRendered) EXPECT() *MockLastRenderedMockRecorder {
return m.recorder return m.recorder
} }
// PathForJob mocks base method.
func (m *MockLastRendered) PathForJob(arg0 string) string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PathForJob", arg0)
ret0, _ := ret[0].(string)
return ret0
}
// PathForJob indicates an expected call of PathForJob.
func (mr *MockLastRenderedMockRecorder) PathForJob(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PathForJob", reflect.TypeOf((*MockLastRendered)(nil).PathForJob), arg0)
}
// QueueImage mocks base method. // QueueImage mocks base method.
func (m *MockLastRendered) QueueImage(arg0 last_rendered.Payload) error { func (m *MockLastRendered) QueueImage(arg0 last_rendered.Payload) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -868,3 +882,55 @@ func (mr *MockLastRenderedMockRecorder) QueueImage(arg0 interface{}) *gomock.Cal
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueImage", reflect.TypeOf((*MockLastRendered)(nil).QueueImage), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueImage", reflect.TypeOf((*MockLastRendered)(nil).QueueImage), arg0)
} }
// ThumbSpecs mocks base method.
func (m *MockLastRendered) ThumbSpecs() []last_rendered.Thumbspec {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ThumbSpecs")
ret0, _ := ret[0].([]last_rendered.Thumbspec)
return ret0
}
// ThumbSpecs indicates an expected call of ThumbSpecs.
func (mr *MockLastRenderedMockRecorder) ThumbSpecs() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThumbSpecs", reflect.TypeOf((*MockLastRendered)(nil).ThumbSpecs))
}
// MockLocalStorage is a mock of LocalStorage interface.
type MockLocalStorage struct {
ctrl *gomock.Controller
recorder *MockLocalStorageMockRecorder
}
// MockLocalStorageMockRecorder is the mock recorder for MockLocalStorage.
type MockLocalStorageMockRecorder struct {
mock *MockLocalStorage
}
// NewMockLocalStorage creates a new mock instance.
func NewMockLocalStorage(ctrl *gomock.Controller) *MockLocalStorage {
mock := &MockLocalStorage{ctrl: ctrl}
mock.recorder = &MockLocalStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLocalStorage) EXPECT() *MockLocalStorageMockRecorder {
return m.recorder
}
// RelPath mocks base method.
func (m *MockLocalStorage) RelPath(arg0 string) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RelPath", arg0)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RelPath indicates an expected call of RelPath.
func (mr *MockLocalStorageMockRecorder) RelPath(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelPath", reflect.TypeOf((*MockLocalStorage)(nil).RelPath), arg0)
}

View File

@ -33,17 +33,19 @@ type mockedFlamenco struct {
shaman *mocks.MockShaman shaman *mocks.MockShaman
clock *clock.Mock clock *clock.Mock
lastRender *mocks.MockLastRendered lastRender *mocks.MockLastRendered
localStorage *mocks.MockLocalStorage
} }
func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
jc := mocks.NewMockJobCompiler(mockCtrl) jc := mocks.NewMockJobCompiler(mockCtrl)
ps := mocks.NewMockPersistenceService(mockCtrl) ps := mocks.NewMockPersistenceService(mockCtrl)
cb := mocks.NewMockChangeBroadcaster(mockCtrl) cb := mocks.NewMockChangeBroadcaster(mockCtrl)
ls := mocks.NewMockLogStorage(mockCtrl) logStore := mocks.NewMockLogStorage(mockCtrl)
cs := mocks.NewMockConfigService(mockCtrl) cs := mocks.NewMockConfigService(mockCtrl)
sm := mocks.NewMockTaskStateMachine(mockCtrl) sm := mocks.NewMockTaskStateMachine(mockCtrl)
sha := mocks.NewMockShaman(mockCtrl) sha := mocks.NewMockShaman(mockCtrl)
lr := mocks.NewMockLastRendered(mockCtrl) lr := mocks.NewMockLastRendered(mockCtrl)
localStore := mocks.NewMockLocalStorage(mockCtrl)
clock := clock.NewMock() clock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00")
@ -52,14 +54,14 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
} }
clock.Set(mockedNow) clock.Set(mockedNow)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock, lr) f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore)
return mockedFlamenco{ return mockedFlamenco{
flamenco: f, flamenco: f,
jobCompiler: jc, jobCompiler: jc,
persistence: ps, persistence: ps,
broadcaster: cb, broadcaster: cb,
logStorage: ls, logStorage: logStore,
config: cs, config: cs,
stateMachine: sm, stateMachine: sm,
clock: clock, clock: clock,

View File

@ -74,10 +74,10 @@ func saveJPEG(targetpath string, img image.Image) error {
return nil return nil
} }
func downscaleImage(spec thumbspec, img image.Image) image.Image { func downscaleImage(spec Thumbspec, img image.Image) image.Image {
// Fill out the entire frame, cropping the image if necessary: // Fill out the entire frame, cropping the image if necessary:
// return imaging.Fill(img, spec.maxWidth, spec.maxHeight, imaging.Center, imaging.Lanczos) // return imaging.Fill(img, spec.maxWidth, spec.maxHeight, imaging.Center, imaging.Lanczos)
// Fit the image to the frame, potentially resulting in either a narrower or lower image: // Fit the image to the frame, potentially resulting in either a narrower or lower image:
return imaging.Fit(img, spec.maxWidth, spec.maxHeight, imaging.Lanczos) return imaging.Fit(img, spec.MaxWidth, spec.MaxHeight, imaging.Lanczos)
} }

View File

@ -28,10 +28,10 @@ var (
// thumbnails specifies the thumbnail sizes. For efficiency, they should be // thumbnails specifies the thumbnail sizes. For efficiency, they should be
// listed from large to small, as each thumbnail is the input for the next // listed from large to small, as each thumbnail is the input for the next
// one. // one.
thumbnails = []thumbspec{ thumbnails = []Thumbspec{
{"last-rendered.jpg", 1920, 1080}, {"last-rendered.jpg", 1920, 1080},
{"last-rendered-small.jpg", 600, 338}, {"last-rendered-small.jpg", 600, 338},
{"last-rendered-tiny.jpg", 48, 28}, {"last-rendered-tiny.jpg", 200, 112},
} }
) )
@ -60,11 +60,11 @@ type Payload struct {
Image []byte Image []byte
} }
// thumbspec specifies a thumbnail size & filename. // Thumbspec specifies a thumbnail size & filename.
type thumbspec struct { type Thumbspec struct {
filename string Filename string
maxWidth int MaxWidth int
maxHeight int MaxHeight int
} }
func New(storage Storage) *LastRenderedProcessor { func New(storage Storage) *LastRenderedProcessor {
@ -112,13 +112,27 @@ func (lrp *LastRenderedProcessor) QueueImage(payload Payload) error {
} }
} }
// PathForJob returns the base path for this job's last-rendered images.
func (lrp *LastRenderedProcessor) PathForJob(jobUUID string) string {
return lrp.storage.ForJob(jobUUID)
}
// ThumbSpecs returns the thumbnail specifications.
func (lrp *LastRenderedProcessor) ThumbSpecs() []Thumbspec {
// Return a copy so modification of the returned slice won't affect the global
// `thumbnails` variable.
copied := make([]Thumbspec, len(thumbnails))
copy(copied, thumbnails)
return copied
}
// processImage down-scales the image to a few thumbnails for presentation in // processImage down-scales the image to a few thumbnails for presentation in
// the web interface, and stores those in a job-specific directory. // the web interface, and stores those in a job-specific directory.
// //
// Because this is intended as internal queue-processing function, errors are // Because this is intended as internal queue-processing function, errors are
// logged but not returned. // logged but not returned.
func (lrp *LastRenderedProcessor) processImage(payload Payload) { func (lrp *LastRenderedProcessor) processImage(payload Payload) {
jobDir := lrp.storage.ForJob(payload.JobUUID) jobDir := lrp.PathForJob(payload.JobUUID)
logger := log.With().Str("jobDir", jobDir).Logger() logger := log.With().Str("jobDir", jobDir).Logger()
logger = payload.sublogger(logger) logger = payload.sublogger(logger)
@ -137,7 +151,7 @@ func (lrp *LastRenderedProcessor) processImage(payload Payload) {
image = downscaleImage(spec, image) image = downscaleImage(spec, image)
imgpath := filepath.Join(jobDir, spec.filename) imgpath := filepath.Join(jobDir, spec.Filename)
if err := saveJPEG(imgpath, image); err != nil { if err := saveJPEG(imgpath, image); err != nil {
thumbLogger.Error().Err(err).Msg("last-rendered: error saving thumbnail") thumbLogger.Error().Err(err).Msg("last-rendered: error saving thumbnail")
break break
@ -158,10 +172,10 @@ func (p Payload) sublogger(logger zerolog.Logger) zerolog.Logger {
Logger() Logger()
} }
func (spec thumbspec) sublogger(logger zerolog.Logger) zerolog.Logger { func (spec Thumbspec) sublogger(logger zerolog.Logger) zerolog.Logger {
return logger.With(). return logger.With().
Int("width", spec.maxWidth). Int("width", spec.MaxWidth).
Int("height", spec.maxHeight). Int("height", spec.MaxHeight).
Str("filename", spec.filename). Str("filename", spec.Filename).
Logger() Logger()
} }

View File

@ -89,22 +89,22 @@ func TestProcessImage(t *testing.T) {
assert.Equal(t, callbackCount, 1, "the 'done' callback should be called exactly once") assert.Equal(t, callbackCount, 1, "the 'done' callback should be called exactly once")
// Check the sizes, they should match the thumbspec. // Check the sizes, they should match the thumbspec.
assertImageSize := func(spec thumbspec) { assertImageSize := func(spec Thumbspec) {
path := filepath.Join(jobdir, spec.filename) path := filepath.Join(jobdir, spec.Filename)
file, err := os.Open(path) file, err := os.Open(path)
if !assert.NoError(t, err, "thumbnail %s should be openable", spec.filename) { if !assert.NoError(t, err, "thumbnail %s should be openable", spec.Filename) {
return return
} }
defer file.Close() defer file.Close()
img, format, err := image.Decode(file) img, format, err := image.Decode(file)
if !assert.NoErrorf(t, err, "thumbnail %s should be decodable", spec.filename) { if !assert.NoErrorf(t, err, "thumbnail %s should be decodable", spec.Filename) {
return return
} }
assert.Equalf(t, "jpeg", format, "thumbnail %s not written in the expected format", spec.filename) assert.Equalf(t, "jpeg", format, "thumbnail %s not written in the expected format", spec.Filename)
assert.LessOrEqualf(t, img.Bounds().Dx(), spec.maxWidth, "thumbnail %s has wrong width", spec.filename) assert.LessOrEqualf(t, img.Bounds().Dx(), spec.MaxWidth, "thumbnail %s has wrong width", spec.Filename)
assert.LessOrEqualf(t, img.Bounds().Dy(), spec.maxHeight, "thumbnail %s has wrong height", spec.filename) assert.LessOrEqualf(t, img.Bounds().Dy(), spec.MaxHeight, "thumbnail %s has wrong height", spec.Filename)
} }
for _, spec := range thumbnails { for _, spec := range thumbnails {

View File

@ -27,9 +27,14 @@ func NewNextToExe(subdir string) StorageInfo {
} }
} }
// ForJob returns the directory path for storing job-related files. // Root returns the root path of the storage.
func (si StorageInfo) Root() string {
return si.rootPath
}
// ForJob returns the absolute directory path for storing job-related files.
func (si StorageInfo) ForJob(jobUUID string) string { func (si StorageInfo) ForJob(jobUUID string) string {
return filepath.Join(si.rootPath, pathForJob(jobUUID)) return filepath.Join(si.rootPath, relPathForJob(jobUUID))
} }
// Erase removes the entire storage directory from disk. // Erase removes the entire storage directory from disk.
@ -59,10 +64,16 @@ func (si StorageInfo) MustErase() {
} }
} }
// RelPath tries to make the given path relative to the local storage root.
// Assumes `path` is already an absolute path.
func (si StorageInfo) RelPath(path string) (string, error) {
return filepath.Rel(si.rootPath, path)
}
// Returns a sub-directory suitable for files of this job. // Returns a sub-directory suitable for files of this job.
// Note that this is intentionally in sync with the `filepath()` function in // Note that this is intentionally in sync with the `filepath()` function in
// `internal/manager/task_logs/task_logs.go`. // `internal/manager/task_logs/task_logs.go`.
func pathForJob(jobUUID string) string { func relPathForJob(jobUUID string) string {
if jobUUID == "" { if jobUUID == "" {
return "jobless" return "jobless"
} }