Worker: send produced output to Manager

Workers now send output produced by Blender (limited to PNG and JPEG
images, currently) to Manager. This is done by converting to JPEG first,
then sending the bytes via the Flamenco API to the Manager.
This commit is contained in:
Sybren A. Stüvel 2022-06-26 15:01:59 +02:00
parent 3d20a89bf5
commit 2d6c11e98b
3 changed files with 291 additions and 16 deletions

View File

@ -8,7 +8,6 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
@ -27,6 +26,7 @@ type Listener struct {
doneWg *sync.WaitGroup
client FlamencoClient
buffer UpstreamBuffer
outputUploader *OutputUploader
}
// UpstreamBuffer can buffer up-stream task updates, in case the Manager cannot be reached.
@ -40,6 +40,7 @@ func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener {
doneWg: new(sync.WaitGroup),
client: client,
buffer: buffer,
outputUploader: NewOutputUploader(client),
}
l.doneWg.Add(1)
return l
@ -49,14 +50,8 @@ func (l *Listener) Run(ctx context.Context) {
defer l.doneWg.Done()
defer log.Debug().Msg("listener shutting down")
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
log.Trace().Msg("listener is still running")
}
}
log.Debug().Msg("listener starting up")
l.outputUploader.Run(ctx)
}
func (l *Listener) Wait() {
@ -103,7 +98,8 @@ func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...s
// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error {
return fmt.Errorf("Listener.OutputProduced(%q, %q): not implemented yet", taskID, outputLocation)
l.outputUploader.OutputProduced(taskID, outputLocation)
return nil
}
func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {

View File

@ -0,0 +1,161 @@
package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"context"
"image"
"image/jpeg"
_ "image/png"
"net/http"
"os"
"sync"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/pkg/last_in_one_out_queue"
)
const thumbnailJPEGQuality = 80
// OutputUploader sends (downscaled versions of) rendered images to Flamenco
// Manager. Only one image is sent at a time. A queue of a single image is kept,
// where newly queued images replace older ones.
type OutputUploader struct {
client FlamencoClient
queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
}
type TaskOutput struct {
TaskID string
Filename string
}
func NewOutputUploader(client FlamencoClient) *OutputUploader {
return &OutputUploader{
client: client,
queue: last_in_one_out_queue.New[TaskOutput](),
}
}
// OutputProduced enqueues the given filename for processing.
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
// TODO: Before enqueueing (and thus overwriting any previously queued item),
// check that this file can actually be handled by the Last Rendered system of
// Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
// the queue by an EXR file we can't handle.
item := TaskOutput{taskID, filename}
ou.queue.Enqueue(item)
}
func (ou *OutputUploader) Run(ctx context.Context) {
log.Info().Msg("output uploader: running")
defer log.Info().Msg("output uploader: shutting down")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ou.queue.Run(ctx)
}()
runLoop:
for {
select {
case <-ctx.Done():
break runLoop
case item := <-ou.queue.Item():
ou.process(ctx, item)
}
}
wg.Wait()
}
// process loads the given image, converts it to JPEG, and uploads it to
// Flamenco Manager.
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
logger := log.With().
Str("image", item.Filename).
Str("task", item.TaskID).
Logger()
logger.Info().Msg("output uploader: processing file before uploading to Manager")
jpegBytes := loadAsJPEG(item.Filename)
if len(jpegBytes) == 0 {
return // loadAsJPEG() already logged the error.
}
// Upload to Manager.
jpegReader := bytes.NewReader(jpegBytes)
resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
ctx, item.TaskID, "image/jpeg", jpegReader)
if err != nil {
logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
return
}
// Handle the Manager response:
switch {
case resp.StatusCode() == http.StatusAccepted:
logger.Info().Msg("output uploader: Manager accepted our image")
case resp.JSON411 != nil:
logger.Error().
Str("message", resp.JSON411.Message).
Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker")
case resp.JSON413 != nil:
logger.Warn().
Str("message", resp.JSON413.Message).
Msg("output uploader: Manager rejected our upload, it is too large")
case resp.JSON415 != nil:
logger.Error().
Str("message", resp.JSON415.Message).
Msg("output uploader: Manager rejected our upload, unsupported file type")
case resp.JSON429 != nil:
logger.Warn().
Str("message", resp.JSON429.Message).
Msg("output uploader: Manager is too busy to handle this upload")
case resp.JSONDefault != nil:
logger.Error().
Str("message", resp.JSONDefault.Message).
Msg("output uploader: error from Manager")
default:
logger.Error().
Str("httpStatus", resp.Status()).
Msg("output uploader: unexpected error from Manager")
}
}
func loadAsJPEG(imagePath string) []byte {
logger := log.With().Str("image", imagePath).Logger()
// Open the output file.
file, err := os.Open(imagePath)
if err != nil {
logger.Error().Err(err).Msg("output uploader: error opening file")
return nil
}
defer file.Close()
// Try to decode the file as image.
img, fileType, err := image.Decode(file)
if err != nil {
logger.Error().Err(err).Msg("output uploader: cannot decode image file")
return nil
}
logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded")
// Compress the image as JPEG.
jpegBuffer := bytes.Buffer{}
options := jpeg.Options{
Quality: thumbnailJPEGQuality,
}
err = jpeg.Encode(&jpegBuffer, img, &options)
if err != nil {
logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG")
return nil
}
return jpegBuffer.Bytes()
}

View File

@ -0,0 +1,118 @@
package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"net/http"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/worker/mocks"
"git.blender.org/flamenco/pkg/api"
)
func TestQueueOutput(t *testing.T) {
ou, mocks, finish := mockedOutputUploader(t)
defer finish()
taskID := "094d98ba-d6e2-4765-a10b-70533604a952"
// Run the queue process, otherwise the queued item cannot be received.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ou.Run(mocks.ctx)
}()
ou.OutputProduced(taskID, "filename.jpg")
// The output should be queued for processing now.
select {
case item := <-ou.queue.Item():
assert.Equal(t, item.Filename, "filename.jpg")
assert.Equal(t, item.TaskID, taskID)
case <-time.After(1 * time.Second):
t.Fatal("output should be queued for processing")
}
mocks.ctxCancel()
wg.Wait()
}
func TestProcess(t *testing.T) {
ou, mocks, finish := mockedOutputUploader(t)
defer finish()
taskID := "094d98ba-d6e2-4765-a10b-70533604a952"
filename := "command_ffmpeg_test_files/frame-1.png"
item := TaskOutput{
TaskID: taskID,
Filename: filename,
}
{
// Test happy response from Manager.
response := api.TaskOutputProducedResponse{
HTTPResponse: &http.Response{
Status: "202 Accepted",
StatusCode: http.StatusAccepted,
},
}
mocks.client.EXPECT().TaskOutputProducedWithBodyWithResponse(
mocks.ctx, taskID, "image/jpeg", gomock.Any()).
Return(&response, nil)
ou.process(mocks.ctx, item)
}
{
// Test unhappy response from Manager (its queue is full).
// The only difference with the happy flow is in the logging, so that's hard
// to assert for here. It's a different flow though, with a different
// non-nil pointer, so it's good to at least check it doesn't cause any
// panics.
response := api.TaskOutputProducedResponse{
JSON429: &api.Error{
Code: http.StatusTooManyRequests,
Message: "enhance your calm",
},
}
mocks.client.EXPECT().TaskOutputProducedWithBodyWithResponse(
mocks.ctx, taskID, "image/jpeg", gomock.Any()).
Return(&response, nil)
ou.process(mocks.ctx, item)
}
}
type outputUploaderTestMocks struct {
client *mocks.MockFlamencoClient
ctx context.Context
ctxCancel context.CancelFunc
}
func mockedOutputUploader(t *testing.T) (*OutputUploader, *outputUploaderTestMocks, func()) {
mockCtrl := gomock.NewController(t)
ctx, cancel := context.WithCancel(context.Background())
mocks := outputUploaderTestMocks{
client: mocks.NewMockFlamencoClient(mockCtrl),
ctx: ctx,
ctxCancel: cancel,
}
ou := NewOutputUploader(mocks.client)
finish := func() {
cancel()
mockCtrl.Finish()
}
return ou, &mocks, finish
}