
Workers now send output produced by Blender (limited to PNG and JPEG images, currently) to Manager. This is done by converting to JPEG first, then sending the bytes via the Flamenco API to the Manager.
162 lines
4.3 KiB
Go
162 lines
4.3 KiB
Go
package worker
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"image"
|
|
"image/jpeg"
|
|
_ "image/png"
|
|
"net/http"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"git.blender.org/flamenco/pkg/last_in_one_out_queue"
|
|
)
|
|
|
|
const thumbnailJPEGQuality = 80
|
|
|
|
// OutputUploader sends (downscaled versions of) rendered images to Flamenco
|
|
// Manager. Only one image is sent at a time. A queue of a single image is kept,
|
|
// where newly queued images replace older ones.
|
|
type OutputUploader struct {
|
|
client FlamencoClient
|
|
queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
|
|
}
|
|
|
|
type TaskOutput struct {
|
|
TaskID string
|
|
Filename string
|
|
}
|
|
|
|
func NewOutputUploader(client FlamencoClient) *OutputUploader {
|
|
return &OutputUploader{
|
|
client: client,
|
|
queue: last_in_one_out_queue.New[TaskOutput](),
|
|
}
|
|
}
|
|
|
|
// OutputProduced enqueues the given filename for processing.
|
|
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
|
|
// TODO: Before enqueueing (and thus overwriting any previously queued item),
|
|
// check that this file can actually be handled by the Last Rendered system of
|
|
// Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
|
|
// the queue by an EXR file we can't handle.
|
|
item := TaskOutput{taskID, filename}
|
|
ou.queue.Enqueue(item)
|
|
}
|
|
|
|
func (ou *OutputUploader) Run(ctx context.Context) {
|
|
log.Info().Msg("output uploader: running")
|
|
defer log.Info().Msg("output uploader: shutting down")
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
ou.queue.Run(ctx)
|
|
}()
|
|
|
|
runLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break runLoop
|
|
case item := <-ou.queue.Item():
|
|
ou.process(ctx, item)
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// process loads the given image, converts it to JPEG, and uploads it to
|
|
// Flamenco Manager.
|
|
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
|
|
logger := log.With().
|
|
Str("image", item.Filename).
|
|
Str("task", item.TaskID).
|
|
Logger()
|
|
logger.Info().Msg("output uploader: processing file before uploading to Manager")
|
|
|
|
jpegBytes := loadAsJPEG(item.Filename)
|
|
if len(jpegBytes) == 0 {
|
|
return // loadAsJPEG() already logged the error.
|
|
}
|
|
|
|
// Upload to Manager.
|
|
jpegReader := bytes.NewReader(jpegBytes)
|
|
resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
|
|
ctx, item.TaskID, "image/jpeg", jpegReader)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
|
|
return
|
|
}
|
|
|
|
// Handle the Manager response:
|
|
switch {
|
|
case resp.StatusCode() == http.StatusAccepted:
|
|
logger.Info().Msg("output uploader: Manager accepted our image")
|
|
case resp.JSON411 != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSON411.Message).
|
|
Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker")
|
|
case resp.JSON413 != nil:
|
|
logger.Warn().
|
|
Str("message", resp.JSON413.Message).
|
|
Msg("output uploader: Manager rejected our upload, it is too large")
|
|
case resp.JSON415 != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSON415.Message).
|
|
Msg("output uploader: Manager rejected our upload, unsupported file type")
|
|
case resp.JSON429 != nil:
|
|
logger.Warn().
|
|
Str("message", resp.JSON429.Message).
|
|
Msg("output uploader: Manager is too busy to handle this upload")
|
|
case resp.JSONDefault != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSONDefault.Message).
|
|
Msg("output uploader: error from Manager")
|
|
default:
|
|
logger.Error().
|
|
Str("httpStatus", resp.Status()).
|
|
Msg("output uploader: unexpected error from Manager")
|
|
}
|
|
}
|
|
|
|
func loadAsJPEG(imagePath string) []byte {
|
|
logger := log.With().Str("image", imagePath).Logger()
|
|
|
|
// Open the output file.
|
|
file, err := os.Open(imagePath)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: error opening file")
|
|
return nil
|
|
}
|
|
defer file.Close()
|
|
|
|
// Try to decode the file as image.
|
|
img, fileType, err := image.Decode(file)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: cannot decode image file")
|
|
return nil
|
|
}
|
|
logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded")
|
|
|
|
// Compress the image as JPEG.
|
|
jpegBuffer := bytes.Buffer{}
|
|
options := jpeg.Options{
|
|
Quality: thumbnailJPEGQuality,
|
|
}
|
|
err = jpeg.Encode(&jpegBuffer, img, &options)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG")
|
|
return nil
|
|
}
|
|
|
|
return jpegBuffer.Bytes()
|
|
}
|