Add a "Last Rendered" view

Add a "Last Rendered" view to the webapp.

The Manager now stores (in the database) which job was the last
recipient of a rendered image, and serves that to the appropriate
OpenAPI endpoint.

A new SocketIO subscription + accompanying room makes it possible for
the web interface to receive all rendered images (if they survive the
queue, which discards images when it gets too full).
This commit is contained in:
Sybren A. Stüvel 2022-07-01 12:34:40 +02:00
parent 801fa20f12
commit d25151184d
19 changed files with 361 additions and 35 deletions

View File

@ -67,6 +67,11 @@ type PersistenceService interface {
// Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)
QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error)
// SetLastRendered sets this job as the one with the most recent rendered image.
SetLastRendered(ctx context.Context, j *persistence.Job) error
// GetLastRendered returns the UUID of the job with the most recent rendered image.
GetLastRenderedJobUUID(ctx context.Context) (string, error)
}
var _ PersistenceService = (*persistence.DB)(nil)

View File

@ -334,6 +334,23 @@ func (f *Flamenco) FetchJobLastRenderedInfo(e echo.Context, jobID string) error
return e.JSON(http.StatusOK, info)
}
func (f *Flamenco) FetchGlobalLastRenderedInfo(e echo.Context) error {
ctx := e.Request().Context()
logger := requestLogger(e)
jobUUID, err := f.persist.GetLastRenderedJobUUID(ctx)
if err != nil {
logger.Error().Err(err).Msg("error getting job UUID with last-rendered image")
return sendAPIError(e, http.StatusInternalServerError, "error finding global last-rendered info: %v", err)
}
if jobUUID == "" {
return e.NoContent(http.StatusNoContent)
}
return f.FetchJobLastRenderedInfo(e, jobUUID)
}
func (f *Flamenco) lastRenderedInfoForJob(logger zerolog.Logger, jobUUID string) (*api.JobLastRenderedImageInfo, error) {
basePath := f.lastRender.PathForJob(jobUUID)
relPath, err := f.localStorage.RelPath(basePath)

View File

@ -353,3 +353,45 @@ func TestFetchJobLastRenderedInfo(t *testing.T) {
assertResponseNoContent(t, echoCtx)
}
}
func TestFetchGlobalLastRenderedInfo(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobUUID := "18a9b096-d77e-438c-9be2-74397038298b"
{
// No last-rendered image exists yet.
mf.persistence.EXPECT().GetLastRenderedJobUUID(gomock.Any()).Return("", nil)
echoCtx := mf.prepareMockedRequest(nil)
err := mf.flamenco.FetchGlobalLastRenderedInfo(echoCtx)
assert.NoError(t, err)
assertResponseNoContent(t, echoCtx)
}
{
// Last-rendered image has been processed.
mf.persistence.EXPECT().GetLastRenderedJobUUID(gomock.Any()).Return(jobUUID, nil)
mf.lastRender.EXPECT().JobHasImage(jobUUID).Return(true)
mf.lastRender.EXPECT().PathForJob(jobUUID).Return("/absolute/path/to/local/job/dir")
mf.localStorage.EXPECT().RelPath("/absolute/path/to/local/job/dir").Return("relative/path", nil)
mf.lastRender.EXPECT().ThumbSpecs().Return([]last_rendered.Thumbspec{
{Filename: "das grosses potaat.jpg"},
{Filename: "invisibru.jpg"},
})
echoCtx := mf.prepareMockedRequest(nil)
err := mf.flamenco.FetchGlobalLastRenderedInfo(echoCtx)
assert.NoError(t, err)
expectBody := api.JobLastRenderedImageInfo{
Base: "/job-files/relative/path",
Suffixes: []string{"das grosses potaat.jpg", "invisibru.jpg"},
}
assertResponseJSON(t, echoCtx, http.StatusOK, expectBody)
}
}

View File

@ -217,6 +217,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkers(arg0 interface{}) *go
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkers", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkers), arg0)
}
// GetLastRenderedJobUUID mocks base method.
func (m *MockPersistenceService) GetLastRenderedJobUUID(arg0 context.Context) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetLastRenderedJobUUID", arg0)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetLastRenderedJobUUID indicates an expected call of GetLastRenderedJobUUID.
func (mr *MockPersistenceServiceMockRecorder) GetLastRenderedJobUUID(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastRenderedJobUUID", reflect.TypeOf((*MockPersistenceService)(nil).GetLastRenderedJobUUID), arg0)
}
// QueryJobTaskSummaries mocks base method.
func (m *MockPersistenceService) QueryJobTaskSummaries(arg0 context.Context, arg1 string) ([]*persistence.Task, error) {
m.ctrl.T.Helper()
@ -332,6 +347,20 @@ func (mr *MockPersistenceServiceMockRecorder) ScheduleTask(arg0, arg1 interface{
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTask", reflect.TypeOf((*MockPersistenceService)(nil).ScheduleTask), arg0, arg1)
}
// SetLastRendered mocks base method.
func (m *MockPersistenceService) SetLastRendered(arg0 context.Context, arg1 *persistence.Job) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetLastRendered", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SetLastRendered indicates an expected call of SetLastRendered.
func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1)
}
// StoreAuthoredJob mocks base method.
func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error {
m.ctrl.T.Helper()

View File

@ -408,7 +408,10 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
MimeType: e.Request().Header.Get("Content-Type"),
Image: imageBytes,
Callback: func() {
Callback: func(ctx context.Context) {
// Store this job as the last one to get a rendered image.
f.persist.SetLastRendered(ctx, dbTask.Job)
// Broadcast when the processing is done.
update := webupdates.NewLastRenderedUpdate(jobUUID)
update.Thumbnail = *thumbnailInfo

View File

@ -548,6 +548,8 @@ func TestTaskOutputProduced(t *testing.T) {
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil)
// Don't expect persistence.SetLastRendered(...) quite yet. That should be
// called after the image processing is done.
echo := prepareRequest(bytes.NewReader(bodyBytes))
echo.Request().Header.Set("Content-Type", "image/jpeg")
@ -568,8 +570,9 @@ func TestTaskOutputProduced(t *testing.T) {
assertResponseNoBody(t, echo, http.StatusAccepted)
if assert.NotNil(t, actualPayload) {
// Calling the callback function is normally done by the last-rendered image processor.
// It should result in a SocketIO broadcast.
ctx := context.Background()
mf.persistence.EXPECT().SetLastRendered(ctx, &job)
expectBroadcast := api.SocketIOLastRenderedUpdate{
JobId: job.UUID,
Thumbnail: api.JobLastRenderedImageInfo{
@ -578,7 +581,9 @@ func TestTaskOutputProduced(t *testing.T) {
},
}
mf.broadcaster.EXPECT().BroadcastLastRenderedImage(expectBroadcast)
actualPayload.Callback()
// Calling the callback function is normally done by the last-rendered image processor.
actualPayload.Callback(ctx)
// Compare the parameter to `QueueImage()` in a way that ignores the callback function.
actualPayload.Callback = nil

View File

@ -60,7 +60,7 @@ type Payload struct {
Image []byte
// Callback is called when the image processing is finished.
Callback func()
Callback func(ctx context.Context)
}
// Thumbspec specifies a thumbnail size & filename.
@ -88,7 +88,7 @@ func (lrp *LastRenderedProcessor) Run(ctx context.Context) {
case <-ctx.Done():
return
case payload := <-lrp.queue:
lrp.processImage(payload)
lrp.processImage(ctx, payload)
}
}
}
@ -146,7 +146,7 @@ func (lrp *LastRenderedProcessor) ThumbSpecs() []Thumbspec {
//
// Because this is intended as internal queue-processing function, errors are
// logged but not returned.
func (lrp *LastRenderedProcessor) processImage(payload Payload) {
func (lrp *LastRenderedProcessor) processImage(ctx context.Context, payload Payload) {
jobDir := lrp.PathForJob(payload.JobUUID)
logger := log.With().Str("jobDir", jobDir).Logger()
@ -175,7 +175,7 @@ func (lrp *LastRenderedProcessor) processImage(payload Payload) {
// Call the callback, if provided.
if payload.Callback != nil {
payload.Callback()
payload.Callback(ctx)
}
}

View File

@ -3,6 +3,7 @@ package last_rendered
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"image"
"os"
"path/filepath"
@ -63,7 +64,7 @@ func TestProcessImage(t *testing.T) {
lrp := New(storage)
callbackCount := 0
payload.Callback = func() {
payload.Callback = func(context.Context) {
callbackCount++
}
@ -73,7 +74,7 @@ func TestProcessImage(t *testing.T) {
assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg"))
assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg"))
lrp.processImage(payload)
lrp.processImage(context.Background(), payload)
// The files should exist now.
assert.FileExists(t, filepath.Join(jobdir, "last-rendered.jpg"))

View File

@ -10,6 +10,7 @@ func (db *DB) migrate() error {
err := db.gormDB.AutoMigrate(
&Job{},
&JobBlock{},
&LastRendered{},
&Task{},
&TaskFailure{},
&Worker{},

View File

@ -0,0 +1,48 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"gorm.io/gorm/clause"
)
// LastRendered only has one entry in its database table, to indicate the job
// that was the last to receive a "last rendered image" from a Worker.
// This is used to show the global last-rendered image in the web interface.
type LastRendered struct {
Model
JobID uint `gorm:"default:0"`
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"`
}
// SetLastRendered sets this job as the one with the most recent rendered image.
func (db *DB) SetLastRendered(ctx context.Context, j *Job) error {
render := LastRendered{
// Always use the same database ID to ensure a single entry.
Model: Model{ID: uint(1)},
JobID: j.ID,
Job: j,
}
tx := db.gormDB.
WithContext(ctx).
Clauses(clause.OnConflict{UpdateAll: true}).
Create(&render)
return tx.Error
}
// GetLastRendered returns the UUID of the job with the most recent rendered image.
func (db *DB) GetLastRenderedJobUUID(ctx context.Context) (string, error) {
job := Job{}
tx := db.gormDB.WithContext(ctx).
Joins("inner join last_rendereds LR on jobs.id = LR.job_id").
Select("uuid").
Find(&job)
if tx.Error != nil {
return "", jobError(tx.Error, "finding job with most rencent render")
}
return job.UUID, nil
}

View File

@ -0,0 +1,69 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSetLastRendered(t *testing.T) {
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
defer close()
authoredJob2 := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "just-a-job")
job2 := persistAuthoredJob(t, ctx, db, authoredJob2)
assert.NoError(t, db.SetLastRendered(ctx, job1))
{
entries := []LastRendered{}
db.gormDB.Model(&LastRendered{}).Scan(&entries)
if assert.Len(t, entries, 1) {
assert.Equal(t, job1.ID, entries[0].JobID, "job 1 should be the last-rendered one")
}
}
assert.NoError(t, db.SetLastRendered(ctx, job2))
{
entries := []LastRendered{}
db.gormDB.Model(&LastRendered{}).Scan(&entries)
if assert.Len(t, entries, 1) {
assert.Equal(t, job2.ID, entries[0].JobID, "job 2 should be the last-rendered one")
}
}
}
func TestGetLastRenderedJobUUID(t *testing.T) {
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
defer close()
{
// Test without any renders.
lastUUID, err := db.GetLastRenderedJobUUID(ctx)
if assert.NoError(t, err, "absence of renders should not cause an error") {
assert.Empty(t, lastUUID)
}
}
{
// Test with first render.
assert.NoError(t, db.SetLastRendered(ctx, job1))
lastUUID, err := db.GetLastRenderedJobUUID(ctx)
if assert.NoError(t, err) {
assert.Equal(t, job1.UUID, lastUUID)
}
}
{
// Test with 2nd or subsequent render.
authoredJob2 := authorTestJob("1295757b-e668-4c49-8b89-f73db8270e42", "just-a-job")
job2 := persistAuthoredJob(t, ctx, db, authoredJob2)
assert.NoError(t, db.SetLastRendered(ctx, job2))
lastUUID, err := db.GetLastRenderedJobUUID(ctx)
if assert.NoError(t, err) {
assert.Equal(t, job2.UUID, lastUUID)
}
}
}

View File

@ -90,6 +90,9 @@ func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedU
log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update")
room := roomForJob(update.JobId)
b.BroadcastTo(room, SIOEventLastRenderedUpdate, update)
// TODO: throttle these via a last-in-one-out queue (see `pkg/last_in_one_out_queue`).
b.BroadcastTo(SocketIORoomLastRendered, SIOEventLastRenderedUpdate, update)
}
// BroadcastTaskLogUpdate sends the task log chunk to clients.

View File

@ -24,6 +24,11 @@ const (
SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates.
// For updates about ALL last-rendered images. Normally these are sent to a
// room specific to a particular job, but for the global "last rendered image"
// all updates are sent here too.
SocketIORoomLastRendered SocketIORoomName = "Last-Rendered"
)
const (
@ -67,6 +72,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
sioRoom = SocketIORoomJobs
case api.SocketIOSubscriptionTypeAllWorkers:
sioRoom = SocketIORoomWorkers
case api.SocketIOSubscriptionTypeAllLastRendered:
sioRoom = SocketIORoomLastRendered
case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")

View File

@ -9,6 +9,9 @@
<li>
<router-link :to="{ name: 'workers' }">Workers</router-link>
</li>
<li>
<router-link :to="{ name: 'last-rendered' }">Last Rendered</router-link>
</li>
</ul>
</nav>
<api-spinner />

View File

@ -2,7 +2,7 @@
<h2 class="column-title">Job Details</h2>
<template v-if="hasJobData">
<last-rendered-image ref="lastRenderedImage" :jobID="jobData.id" />
<last-rendered-image ref="lastRenderedImage" :jobID="jobData.id" thumbnailSuffix="-tiny" />
<dl>
<dt class="field-id">ID</dt>

View File

@ -1,14 +1,30 @@
<template>
<div v-if="imageURL != ''" :class="cssClasses">
<img :src="imageURL" alt="Last-rendered image for this job">
</div>
</template>
<script setup>
import { reactive, ref, watch } from 'vue'
import { api } from '@/urls';
import { JobsApi, JobLastRenderedImageInfo, SocketIOLastRenderedUpdate } from '@/manager-api';
import { apiClient } from '@/stores/api-query-count';
const props = defineProps(['jobID']);
const props = defineProps([
/* The job UUID to show renders for, or some false-y value if renders from all
* jobs should be accepted. */
'jobID',
/* Name of the thumbnail, or subset thereof. See `JobLastRenderedImageInfo` in
* `flamenco-openapi.yaml`, and * `internal/manager/last_rendered/last_rendered.go`.
* The component picks the 'suffix' that has the given `thumbnailSuffix` as
* substring. */
'thumbnailSuffix',
]);
const imageURL = ref('');
const cssClasses = reactive({
lastRendered: true,
nothingRenderedYet: true,
'last-rendered': true,
'nothing-rendered-yet': true,
})
const jobsApi = new JobsApi(apiClient);
@ -17,7 +33,13 @@ const jobsApi = new JobsApi(apiClient);
* Fetches the last-rendered info for the given job, then updates the <img> tag for it.
*/
function fetchImageURL(jobID) {
jobsApi.fetchJobLastRenderedInfo(jobID)
let promise;
if (jobID)
promise = jobsApi.fetchJobLastRenderedInfo(jobID);
else
promise = jobsApi.fetchGlobalLastRenderedInfo();
promise
.then(setImageURL)
.catch((error) => { console.warn("error fetching last-rendered image info:", error) });
}
@ -26,17 +48,22 @@ function fetchImageURL(jobID) {
* @param {JobLastRenderedImageInfo} thumbnailInfo
*/
function setImageURL(thumbnailInfo) {
console.log("LastRenderedImage.vue: setImageURL", thumbnailInfo);
if (thumbnailInfo == null) {
// This indicates that there is no last-rendered image.
// Default to a hard-coded 'nothing to be seen here, move along' image.
imageURL.value = "/nothing-rendered-yet.svg";
cssClasses.nothingRenderedYet = true;
cssClasses['nothing-rendered-yet'] = true;
console.log("LastRenderedImage.vue: setting image URL to:", imageURL.value);
return;
}
// Set the image URL to something appropriate.
let foundThumbnail = false;
const suffixToFind = props.thumbnailSuffix;
for (let suffix of thumbnailInfo.suffixes) {
if (!suffix.includes("-tiny")) continue;
if (!suffix.includes(suffixToFind)) continue;
// This uses the API URL to construct the image URL, as the image comes from
// Flamenco Manager, and not from any development server that might be
@ -45,16 +72,22 @@ function setImageURL(thumbnailInfo) {
url.pathname = thumbnailInfo.base + "/" + suffix
url.search = new Date().getTime(); // This forces the image to be reloaded.
imageURL.value = url.toString();
console.log("LastRenderedImage.vue: setting image URL to:", imageURL.value);
foundThumbnail = true;
break;
}
cssClasses.nothingRenderedYet = false;
if (!foundThumbnail) {
console.warn(`LastRenderedImage.vue: could not find thumbnail with suffix "${suffixToFind}"; available are:`, thumbnailInfo.suffixes);
}
cssClasses['nothing-rendered-yet'] = !foundThumbnail;
}
/**
* @param {SocketIOLastRenderedUpdate} lastRenderedUpdate
*/
function refreshLastRenderedImage(lastRenderedUpdate) {
if (lastRenderedUpdate.job_id != props.jobID) {
// Only filter out other job IDs if this component has actually a non-empty job ID.
if (props.jobID && lastRenderedUpdate.job_id != props.jobID) {
console.log(
"LastRenderedImage.vue: refreshLastRenderedImage() received update for job",
lastRenderedUpdate.job_id,
@ -62,6 +95,7 @@ function refreshLastRenderedImage(lastRenderedUpdate) {
return;
}
console.log('refreshLastRenderedImage:', lastRenderedUpdate);
setImageURL(lastRenderedUpdate.thumbnail);
}
@ -79,27 +113,14 @@ defineExpose({
});
</script>
<template>
<div v-if="imageURL != ''" :class="cssClasses">
<img :src="imageURL" alt="Last-rendered image for this job">
</div>
</template>
<style scoped>
.lastRendered {
width: 200px;
height: 112px;
float: right;
}
.lastRendered.nothingRenderedYet {
.last-rendered.nothing-rendered-yet {
outline: thin dotted var(--color-text-hint);
}
.lastRendered img {
.last-rendered img {
width: 100%;
height: 100%;
object-fit: contain;
}
</style>

View File

@ -20,6 +20,11 @@ const router = createRouter({
component: () => import('../views/WorkersView.vue'),
props: true,
},
{
path: '/last-rendered',
name: 'last-rendered',
component: () => import('../views/LastRenderedView.vue'),
},
],
})

View File

@ -160,7 +160,6 @@ export default {
* @param {API.SocketIOLastRenderedUpdate} lastRenderedUpdate
*/
onSioLastRenderedUpdate(lastRenderedUpdate) {
console.log('lastRenderedUpdate:', lastRenderedUpdate);
this.$refs.jobDetails.refreshLastRenderedImage(lastRenderedUpdate);
},
@ -248,4 +247,10 @@ export default {
.isFetching {
opacity: 50%;
}
.last-rendered {
width: 200px;
height: 112px;
float: right;
}
</style>

View File

@ -0,0 +1,62 @@
<template>
<div class="global-last-rendered">
<last-rendered-image
ref="lastRenderedImage"
:jobID="false"
thumbnailSuffix="last-rendered.jpg" />
</div>
<footer class="window-footer"><notification-bar /></footer>
<update-listener ref="updateListener" mainSubscription="allLastRendered"
@lastRenderedUpdate="onSioLastRenderedUpdate"
@sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
</template>
<script>
import LastRenderedImage from '@/components/jobs/LastRenderedImage.vue'
import NotificationBar from '@/components/footer/NotificationBar.vue'
import UpdateListener from '@/components/UpdateListener.vue'
export default {
name: 'LastRenderedView',
components: {
LastRenderedImage,
NotificationBar,
UpdateListener,
},
data: () => ({
}),
methods: {
/**
* Event handler for SocketIO "last-rendered" updates.
* @param {API.SocketIOLastRenderedUpdate} lastRenderedUpdate
*/
onSioLastRenderedUpdate(lastRenderedUpdate) {
this.$refs.lastRenderedImage.refreshLastRenderedImage(lastRenderedUpdate);
},
// SocketIO connection event handlers:
onSIOReconnected() {
},
onSIODisconnected(reason) {
},
},
}
</script>
<style scoped>
.last-rendered {
max-height: 100%;
}
.global-last-rendered {
grid-column-start: col-1;
grid-column-end: col-3;
/* FIXME: the positioning of the image & sizing of the container DIV doesn't
work well yet. */
height: 100%;
}
</style>