diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 60087b73..f7d5927a 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -67,6 +67,11 @@ type PersistenceService interface { // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) + + // SetLastRendered sets this job as the one with the most recent rendered image. + SetLastRendered(ctx context.Context, j *persistence.Job) error + // GetLastRendered returns the UUID of the job with the most recent rendered image. + GetLastRenderedJobUUID(ctx context.Context) (string, error) } var _ PersistenceService = (*persistence.DB)(nil) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 813653ff..7f501d0f 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -334,6 +334,23 @@ func (f *Flamenco) FetchJobLastRenderedInfo(e echo.Context, jobID string) error return e.JSON(http.StatusOK, info) } +func (f *Flamenco) FetchGlobalLastRenderedInfo(e echo.Context) error { + ctx := e.Request().Context() + logger := requestLogger(e) + + jobUUID, err := f.persist.GetLastRenderedJobUUID(ctx) + if err != nil { + logger.Error().Err(err).Msg("error getting job UUID with last-rendered image") + return sendAPIError(e, http.StatusInternalServerError, "error finding global last-rendered info: %v", err) + } + + if jobUUID == "" { + return e.NoContent(http.StatusNoContent) + } + + return f.FetchJobLastRenderedInfo(e, jobUUID) +} + func (f *Flamenco) lastRenderedInfoForJob(logger zerolog.Logger, jobUUID string) (*api.JobLastRenderedImageInfo, error) { basePath := f.lastRender.PathForJob(jobUUID) relPath, err := f.localStorage.RelPath(basePath) diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 3e2c5fb6..ab57101b 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -353,3 +353,45 @@ func TestFetchJobLastRenderedInfo(t *testing.T) { assertResponseNoContent(t, echoCtx) } } + +func TestFetchGlobalLastRenderedInfo(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobUUID := "18a9b096-d77e-438c-9be2-74397038298b" + + { + // No last-rendered image exists yet. + mf.persistence.EXPECT().GetLastRenderedJobUUID(gomock.Any()).Return("", nil) + + echoCtx := mf.prepareMockedRequest(nil) + err := mf.flamenco.FetchGlobalLastRenderedInfo(echoCtx) + assert.NoError(t, err) + assertResponseNoContent(t, echoCtx) + } + + { + // Last-rendered image has been processed. + mf.persistence.EXPECT().GetLastRenderedJobUUID(gomock.Any()).Return(jobUUID, nil) + mf.lastRender.EXPECT().JobHasImage(jobUUID).Return(true) + mf.lastRender.EXPECT().PathForJob(jobUUID).Return("/absolute/path/to/local/job/dir") + mf.localStorage.EXPECT().RelPath("/absolute/path/to/local/job/dir").Return("relative/path", nil) + mf.lastRender.EXPECT().ThumbSpecs().Return([]last_rendered.Thumbspec{ + {Filename: "das grosses potaat.jpg"}, + {Filename: "invisibru.jpg"}, + }) + + echoCtx := mf.prepareMockedRequest(nil) + err := mf.flamenco.FetchGlobalLastRenderedInfo(echoCtx) + assert.NoError(t, err) + + expectBody := api.JobLastRenderedImageInfo{ + Base: "/job-files/relative/path", + Suffixes: []string{"das grosses potaat.jpg", "invisibru.jpg"}, + } + assertResponseJSON(t, echoCtx, http.StatusOK, expectBody) + } + +} diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 4667696f..0e638040 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -217,6 +217,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkers(arg0 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkers", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkers), arg0) } +// GetLastRenderedJobUUID mocks base method. +func (m *MockPersistenceService) GetLastRenderedJobUUID(arg0 context.Context) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastRenderedJobUUID", arg0) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLastRenderedJobUUID indicates an expected call of GetLastRenderedJobUUID. +func (mr *MockPersistenceServiceMockRecorder) GetLastRenderedJobUUID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastRenderedJobUUID", reflect.TypeOf((*MockPersistenceService)(nil).GetLastRenderedJobUUID), arg0) +} + // QueryJobTaskSummaries mocks base method. func (m *MockPersistenceService) QueryJobTaskSummaries(arg0 context.Context, arg1 string) ([]*persistence.Task, error) { m.ctrl.T.Helper() @@ -332,6 +347,20 @@ func (mr *MockPersistenceServiceMockRecorder) ScheduleTask(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTask", reflect.TypeOf((*MockPersistenceService)(nil).ScheduleTask), arg0, arg1) } +// SetLastRendered mocks base method. +func (m *MockPersistenceService) SetLastRendered(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetLastRendered", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetLastRendered indicates an expected call of SetLastRendered. +func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1) +} + // StoreAuthoredJob mocks base method. func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index f977458b..92a8eaa2 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -408,7 +408,10 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { MimeType: e.Request().Header.Get("Content-Type"), Image: imageBytes, - Callback: func() { + Callback: func(ctx context.Context) { + // Store this job as the last one to get a rendered image. + f.persist.SetLastRendered(ctx, dbTask.Job) + // Broadcast when the processing is done. update := webupdates.NewLastRenderedUpdate(jobUUID) update.Thumbnail = *thumbnailInfo diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 85e09d63..1401abe6 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -548,6 +548,8 @@ func TestTaskOutputProduced(t *testing.T) { { mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil) + // Don't expect persistence.SetLastRendered(...) quite yet. That should be + // called after the image processing is done. echo := prepareRequest(bytes.NewReader(bodyBytes)) echo.Request().Header.Set("Content-Type", "image/jpeg") @@ -568,8 +570,9 @@ func TestTaskOutputProduced(t *testing.T) { assertResponseNoBody(t, echo, http.StatusAccepted) if assert.NotNil(t, actualPayload) { - // Calling the callback function is normally done by the last-rendered image processor. - // It should result in a SocketIO broadcast. + ctx := context.Background() + mf.persistence.EXPECT().SetLastRendered(ctx, &job) + expectBroadcast := api.SocketIOLastRenderedUpdate{ JobId: job.UUID, Thumbnail: api.JobLastRenderedImageInfo{ @@ -578,7 +581,9 @@ func TestTaskOutputProduced(t *testing.T) { }, } mf.broadcaster.EXPECT().BroadcastLastRenderedImage(expectBroadcast) - actualPayload.Callback() + + // Calling the callback function is normally done by the last-rendered image processor. + actualPayload.Callback(ctx) // Compare the parameter to `QueueImage()` in a way that ignores the callback function. actualPayload.Callback = nil diff --git a/internal/manager/last_rendered/last_rendered.go b/internal/manager/last_rendered/last_rendered.go index 7ea83cbb..a40c8db8 100644 --- a/internal/manager/last_rendered/last_rendered.go +++ b/internal/manager/last_rendered/last_rendered.go @@ -60,7 +60,7 @@ type Payload struct { Image []byte // Callback is called when the image processing is finished. - Callback func() + Callback func(ctx context.Context) } // Thumbspec specifies a thumbnail size & filename. @@ -88,7 +88,7 @@ func (lrp *LastRenderedProcessor) Run(ctx context.Context) { case <-ctx.Done(): return case payload := <-lrp.queue: - lrp.processImage(payload) + lrp.processImage(ctx, payload) } } } @@ -146,7 +146,7 @@ func (lrp *LastRenderedProcessor) ThumbSpecs() []Thumbspec { // // Because this is intended as internal queue-processing function, errors are // logged but not returned. -func (lrp *LastRenderedProcessor) processImage(payload Payload) { +func (lrp *LastRenderedProcessor) processImage(ctx context.Context, payload Payload) { jobDir := lrp.PathForJob(payload.JobUUID) logger := log.With().Str("jobDir", jobDir).Logger() @@ -175,7 +175,7 @@ func (lrp *LastRenderedProcessor) processImage(payload Payload) { // Call the callback, if provided. if payload.Callback != nil { - payload.Callback() + payload.Callback(ctx) } } diff --git a/internal/manager/last_rendered/last_rendered_test.go b/internal/manager/last_rendered/last_rendered_test.go index ac15314e..760451c5 100644 --- a/internal/manager/last_rendered/last_rendered_test.go +++ b/internal/manager/last_rendered/last_rendered_test.go @@ -3,6 +3,7 @@ package last_rendered // SPDX-License-Identifier: GPL-3.0-or-later import ( + "context" "image" "os" "path/filepath" @@ -63,7 +64,7 @@ func TestProcessImage(t *testing.T) { lrp := New(storage) callbackCount := 0 - payload.Callback = func() { + payload.Callback = func(context.Context) { callbackCount++ } @@ -73,7 +74,7 @@ func TestProcessImage(t *testing.T) { assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg")) assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg")) - lrp.processImage(payload) + lrp.processImage(context.Background(), payload) // The files should exist now. assert.FileExists(t, filepath.Join(jobdir, "last-rendered.jpg")) diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index e4fc0d2d..588fb51b 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -10,6 +10,7 @@ func (db *DB) migrate() error { err := db.gormDB.AutoMigrate( &Job{}, &JobBlock{}, + &LastRendered{}, &Task{}, &TaskFailure{}, &Worker{}, diff --git a/internal/manager/persistence/last_rendered.go b/internal/manager/persistence/last_rendered.go new file mode 100644 index 00000000..e51e7a98 --- /dev/null +++ b/internal/manager/persistence/last_rendered.go @@ -0,0 +1,48 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + + "gorm.io/gorm/clause" +) + +// LastRendered only has one entry in its database table, to indicate the job +// that was the last to receive a "last rendered image" from a Worker. +// This is used to show the global last-rendered image in the web interface. +type LastRendered struct { + Model + JobID uint `gorm:"default:0"` + Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` +} + +// SetLastRendered sets this job as the one with the most recent rendered image. +func (db *DB) SetLastRendered(ctx context.Context, j *Job) error { + render := LastRendered{ + // Always use the same database ID to ensure a single entry. + Model: Model{ID: uint(1)}, + + JobID: j.ID, + Job: j, + } + + tx := db.gormDB. + WithContext(ctx). + Clauses(clause.OnConflict{UpdateAll: true}). + Create(&render) + return tx.Error +} + +// GetLastRendered returns the UUID of the job with the most recent rendered image. +func (db *DB) GetLastRenderedJobUUID(ctx context.Context) (string, error) { + job := Job{} + tx := db.gormDB.WithContext(ctx). + Joins("inner join last_rendereds LR on jobs.id = LR.job_id"). + Select("uuid"). + Find(&job) + if tx.Error != nil { + return "", jobError(tx.Error, "finding job with most rencent render") + } + return job.UUID, nil +} diff --git a/internal/manager/persistence/last_rendered_test.go b/internal/manager/persistence/last_rendered_test.go new file mode 100644 index 00000000..c8cca7f4 --- /dev/null +++ b/internal/manager/persistence/last_rendered_test.go @@ -0,0 +1,69 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSetLastRendered(t *testing.T) { + ctx, close, db, job1, _ := jobTasksTestFixtures(t) + defer close() + + authoredJob2 := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "just-a-job") + job2 := persistAuthoredJob(t, ctx, db, authoredJob2) + + assert.NoError(t, db.SetLastRendered(ctx, job1)) + { + entries := []LastRendered{} + db.gormDB.Model(&LastRendered{}).Scan(&entries) + if assert.Len(t, entries, 1) { + assert.Equal(t, job1.ID, entries[0].JobID, "job 1 should be the last-rendered one") + } + } + + assert.NoError(t, db.SetLastRendered(ctx, job2)) + { + entries := []LastRendered{} + db.gormDB.Model(&LastRendered{}).Scan(&entries) + if assert.Len(t, entries, 1) { + assert.Equal(t, job2.ID, entries[0].JobID, "job 2 should be the last-rendered one") + } + } +} + +func TestGetLastRenderedJobUUID(t *testing.T) { + ctx, close, db, job1, _ := jobTasksTestFixtures(t) + defer close() + + { + // Test without any renders. + lastUUID, err := db.GetLastRenderedJobUUID(ctx) + if assert.NoError(t, err, "absence of renders should not cause an error") { + assert.Empty(t, lastUUID) + } + } + + { + // Test with first render. + assert.NoError(t, db.SetLastRendered(ctx, job1)) + lastUUID, err := db.GetLastRenderedJobUUID(ctx) + if assert.NoError(t, err) { + assert.Equal(t, job1.UUID, lastUUID) + } + } + + { + // Test with 2nd or subsequent render. + authoredJob2 := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "just-a-job") + job2 := persistAuthoredJob(t, ctx, db, authoredJob2) + + assert.NoError(t, db.SetLastRendered(ctx, job2)) + lastUUID, err := db.GetLastRenderedJobUUID(ctx) + if assert.NoError(t, err) { + assert.Equal(t, job2.UUID, lastUUID) + } + } +} diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index cbced2e7..9e611f26 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -90,6 +90,9 @@ func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedU log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update") room := roomForJob(update.JobId) b.BroadcastTo(room, SIOEventLastRenderedUpdate, update) + + // TODO: throttle these via a last-in-one-out queue (see `pkg/last_in_one_out_queue`). + b.BroadcastTo(SocketIORoomLastRendered, SIOEventLastRenderedUpdate, update) } // BroadcastTaskLogUpdate sends the task log chunk to clients. diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 9ad416f4..907b5d38 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -24,6 +24,11 @@ const ( SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. + + // For updates about ALL last-rendered images. Normally these are sent to a + // room specific to a particular job, but for the global "last rendered image" + // all updates are sent here too. + SocketIORoomLastRendered SocketIORoomName = "Last-Rendered" ) const ( @@ -67,6 +72,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock sioRoom = SocketIORoomJobs case api.SocketIOSubscriptionTypeAllWorkers: sioRoom = SocketIORoomWorkers + case api.SocketIOSubscriptionTypeAllLastRendered: + sioRoom = SocketIORoomLastRendered case api.SocketIOSubscriptionTypeJob: if subs.Uuid == nil { logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") diff --git a/web/app/src/App.vue b/web/app/src/App.vue index f1eae25e..92d90b0b 100644 --- a/web/app/src/App.vue +++ b/web/app/src/App.vue @@ -9,6 +9,9 @@
  • Workers
  • +
  • + Last Rendered +
  • diff --git a/web/app/src/components/jobs/JobDetails.vue b/web/app/src/components/jobs/JobDetails.vue index 6d344e74..1b8e048d 100644 --- a/web/app/src/components/jobs/JobDetails.vue +++ b/web/app/src/components/jobs/JobDetails.vue @@ -2,7 +2,7 @@

    Job Details