diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index d1e76489..d118f6c8 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -88,6 +88,7 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) type ChangeBroadcaster interface { // BroadcastNewJob sends a 'new job' notification to all SocketIO clients. BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) + BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) // Note that there is no BroadcastNewTask. The 'new job' broadcast is sent // after the job's tasks have been created, and thus there is no need for a diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 5134afb0..85fb985b 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -10,6 +10,7 @@ import ( "path" "github.com/labstack/echo/v4" + "github.com/rs/zerolog" "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/persistence" @@ -317,16 +318,25 @@ func (f *Flamenco) FetchJobLastRenderedInfo(e echo.Context, jobID string) error } logger := requestLogger(e) - - basePath := f.lastRender.PathForJob(jobID) - relPath, err := f.localStorage.RelPath(basePath) + info, err := f.lastRenderedInfoForJob(logger, jobID) if err != nil { logger.Error(). Str("job", jobID). - Str("renderPath", basePath). Err(err). - Msg("last-rendered path for this job is outside the local storage") - return sendAPIError(e, http.StatusInternalServerError, "error finding job storage path: %v", err) + Msg("error getting last-rendered info") + return sendAPIError(e, http.StatusInternalServerError, "error finding last-rendered info: %v", err) + } + + return e.JSON(http.StatusOK, info) +} + +func (f *Flamenco) lastRenderedInfoForJob(logger zerolog.Logger, jobUUID string) (*api.JobLastRenderedImageInfo, error) { + basePath := f.lastRender.PathForJob(jobUUID) + relPath, err := f.localStorage.RelPath(basePath) + if err != nil { + return nil, fmt.Errorf( + "last-rendered path for job %s is %q, which is outside local storage root: %w", + jobUUID, basePath, err) } suffixes := []string{} @@ -338,7 +348,7 @@ func (f *Flamenco) FetchJobLastRenderedInfo(e echo.Context, jobID string) error Base: path.Join(JobFilesURLPrefix, relPath), Suffixes: suffixes, } - return e.JSON(http.StatusOK, info) + return &info, nil } func jobDBtoAPI(dbJob *persistence.Job) api.Job { diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index be05f9f7..0145bb24 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -412,6 +412,18 @@ func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { return m.recorder } +// BroadcastLastRenderedImage mocks base method. +func (m *MockChangeBroadcaster) BroadcastLastRenderedImage(arg0 api.SocketIOLastRenderedUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastLastRenderedImage", arg0) +} + +// BroadcastLastRenderedImage indicates an expected call of BroadcastLastRenderedImage. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastLastRenderedImage(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastLastRenderedImage", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastLastRenderedImage), arg0) +} + // BroadcastNewJob mocks base method. func (m *MockChangeBroadcaster) BroadcastNewJob(arg0 api.SocketIOJobUpdate) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index fe28d029..85643aec 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -66,6 +66,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { stateMachine: sm, clock: clock, lastRender: lr, + localStorage: localStore, } } diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 95667dce..f977458b 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -396,11 +396,24 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { } // Create the "last rendered" payload. + jobUUID := dbTask.Job.UUID + thumbnailInfo, err := f.lastRenderedInfoForJob(logger, jobUUID) + if err != nil { + logger.Error().Err(err).Msg("TaskOutputProduced: error getting last-rendered thumbnail info for job") + return sendAPIError(e, http.StatusInternalServerError, "error getting last-rendered thumbnail info for job: %v", err) + } payload := last_rendered.Payload{ - JobUUID: dbTask.Job.UUID, + JobUUID: jobUUID, WorkerUUID: worker.UUID, MimeType: e.Request().Header.Get("Content-Type"), Image: imageBytes, + + Callback: func() { + // Broadcast when the processing is done. + update := webupdates.NewLastRenderedUpdate(jobUUID) + update.Thumbnail = *thumbnailInfo + f.broadcaster.BroadcastLastRenderedImage(update) + }, } // Queue the image for processing: diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 9ded970e..85e09d63 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -484,6 +484,13 @@ func TestTaskOutputProduced(t *testing.T) { // Mock body to use in the request. bodyBytes := []byte("JPEG file contents") + mf.lastRender.EXPECT().ThumbSpecs().Return([]last_rendered.Thumbspec{ + {Filename: "big'un.jpg", MaxWidth: 1920, MaxHeight: 1080}, + {Filename: "tiny.jpg", MaxWidth: 320, MaxHeight: 240}, + }).AnyTimes() + mf.lastRender.EXPECT().PathForJob(job.UUID).Return("/path/to/job").AnyTimes() + mf.localStorage.EXPECT().RelPath("/path/to/job").Return("relative/path/to/job", nil).AnyTimes() + // Test: unhappy, missing Content-Length header. { mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) @@ -550,11 +557,33 @@ func TestTaskOutputProduced(t *testing.T) { MimeType: "image/jpeg", Image: bodyBytes, } - mf.lastRender.EXPECT().QueueImage(expectPayload).Return(nil) + var actualPayload *last_rendered.Payload + mf.lastRender.EXPECT().QueueImage(gomock.Any()).DoAndReturn(func(payload last_rendered.Payload) error { + actualPayload = &payload + return nil + }) err := mf.flamenco.TaskOutputProduced(echo, task.UUID) assert.NoError(t, err) assertResponseNoBody(t, echo, http.StatusAccepted) + + if assert.NotNil(t, actualPayload) { + // Calling the callback function is normally done by the last-rendered image processor. + // It should result in a SocketIO broadcast. + expectBroadcast := api.SocketIOLastRenderedUpdate{ + JobId: job.UUID, + Thumbnail: api.JobLastRenderedImageInfo{ + Base: "/job-files/relative/path/to/job", + Suffixes: []string{"big'un.jpg", "tiny.jpg"}, + }, + } + mf.broadcaster.EXPECT().BroadcastLastRenderedImage(expectBroadcast) + actualPayload.Callback() + + // Compare the parameter to `QueueImage()` in a way that ignores the callback function. + actualPayload.Callback = nil + assert.Equal(t, expectPayload, *actualPayload) + } } } diff --git a/internal/manager/last_rendered/last_rendered.go b/internal/manager/last_rendered/last_rendered.go index fba38f30..1f0fb01c 100644 --- a/internal/manager/last_rendered/last_rendered.go +++ b/internal/manager/last_rendered/last_rendered.go @@ -48,8 +48,6 @@ type LastRenderedProcessor struct { // TODO: expand this queue to be per job, so that one spammy job doesn't block // the queue for other jobs. queue chan Payload - - processingDonecallback func(jobUUID string) } // Payload contains the actual image to process. @@ -58,6 +56,9 @@ type Payload struct { WorkerUUID string // Just for logging. MimeType string Image []byte + + // Callback is called when the image processing is finished. + Callback func() } // Thumbspec specifies a thumbnail size & filename. @@ -74,14 +75,6 @@ func New(storage Storage) *LastRenderedProcessor { } } -// SetCallback registers a 'done' callback, which will be called after the job -// received a new last-rendered image. -// There is only one such callback, so calling this will overwrite the -// previously-set callback function. Pass `nil` to un-set. -func (lrp *LastRenderedProcessor) SetCallback(processingDonecallback func(jobUUID string)) { - lrp.processingDonecallback = processingDonecallback -} - // Run is the main loop for the processing of images. It will keep running until // the context is closed. func (lrp *LastRenderedProcessor) Run(ctx context.Context) { @@ -159,8 +152,8 @@ func (lrp *LastRenderedProcessor) processImage(payload Payload) { } // Call the callback, if provided. - if lrp.processingDonecallback != nil { - lrp.processingDonecallback(payload.JobUUID) + if payload.Callback != nil { + payload.Callback() } } diff --git a/internal/manager/last_rendered/last_rendered_test.go b/internal/manager/last_rendered/last_rendered_test.go index 2e052611..ac15314e 100644 --- a/internal/manager/last_rendered/last_rendered_test.go +++ b/internal/manager/last_rendered/last_rendered_test.go @@ -16,14 +16,9 @@ func TestNew(t *testing.T) { storage := local_storage.NewNextToExe("lrp") defer storage.MustErase() - callback := func(string) {} lrp := New(storage) assert.Equal(t, lrp.storage, storage) assert.NotNil(t, lrp.queue) - assert.Nil(t, lrp.processingDonecallback) - - lrp.SetCallback(callback) - assert.NotNil(t, lrp.processingDonecallback) } func TestQueueImage(t *testing.T) { @@ -68,10 +63,9 @@ func TestProcessImage(t *testing.T) { lrp := New(storage) callbackCount := 0 - lrp.SetCallback(func(callbackJobID string) { - assert.Equal(t, jobID, callbackJobID) + payload.Callback = func() { callbackCount++ - }) + } // Sanity check: the thumbnails shouldn't exist yet. jobdir := storage.ForJob(jobID) diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index 96068877..cbced2e7 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -42,6 +42,15 @@ func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate { return taskUpdate } +// NewLastRenderedUpdate returns a partial SocketIOLastRenderedUpdate struct. +// The `Thumbnail` field still needs to be filled in, but that requires +// information from the `api_impl.Flamenco` service. +func NewLastRenderedUpdate(jobUUID string) api.SocketIOLastRenderedUpdate { + return api.SocketIOLastRenderedUpdate{ + JobId: jobUUID, + } +} + // NewTaskLogUpdate returns a SocketIOTaskLogUpdate for the given task. func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdate { return api.SocketIOTaskLogUpdate{ @@ -76,6 +85,13 @@ func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate) } +// BroadcastLastRenderedImage sends the 'last-rendered' update to clients. +func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) { + log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update") + room := roomForJob(update.JobId) + b.BroadcastTo(room, SIOEventLastRenderedUpdate, update) +} + // BroadcastTaskLogUpdate sends the task log chunk to clients. func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { // Don't log the contents here; logs can get big. diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 9aef9c92..9ad416f4 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -28,13 +28,14 @@ const ( const ( // Predefined SocketIO event types. - SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here - SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here - SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate - SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate - SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate - SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate - SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription + SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here + SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here + SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate + SIOEventLastRenderedUpdate SocketIOEventType = "/last-rendered" // sends api.SocketIOLastRenderedUpdate + SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate + SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate + SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate + SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) {