
Fix T100757 by reducing the log level to "info" when Blender writes output to a file format the Worker cannot handle. Such cases are expected, and now no longer result in an error message.
171 lines
4.7 KiB
Go
171 lines
4.7 KiB
Go
package worker
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"image"
|
|
"image/jpeg"
|
|
_ "image/png"
|
|
"net/http"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"git.blender.org/flamenco/pkg/last_in_one_out_queue"
|
|
)
|
|
|
|
const thumbnailJPEGQuality = 85
|
|
|
|
// OutputUploader sends (downscaled versions of) rendered images to Flamenco
|
|
// Manager. Only one image is sent at a time. A queue of a single image is kept,
|
|
// where newly queued images replace older ones.
|
|
type OutputUploader struct {
|
|
client FlamencoClient
|
|
queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
|
|
}
|
|
|
|
type TaskOutput struct {
|
|
TaskID string
|
|
Filename string
|
|
}
|
|
|
|
func NewOutputUploader(client FlamencoClient) *OutputUploader {
|
|
return &OutputUploader{
|
|
client: client,
|
|
queue: last_in_one_out_queue.New[TaskOutput](),
|
|
}
|
|
}
|
|
|
|
// OutputProduced enqueues the given filename for processing.
|
|
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
|
|
// TODO: Before enqueueing (and thus overwriting any previously queued item),
|
|
// check that this file can actually be handled by the Last Rendered system of
|
|
// Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
|
|
// the queue by an EXR file we can't handle.
|
|
item := TaskOutput{taskID, filename}
|
|
ou.queue.Enqueue(item)
|
|
}
|
|
|
|
func (ou *OutputUploader) Run(ctx context.Context) {
|
|
log.Info().Msg("output uploader: running")
|
|
defer log.Info().Msg("output uploader: shutting down")
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
ou.queue.Run(ctx)
|
|
}()
|
|
|
|
runLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break runLoop
|
|
case item := <-ou.queue.Item():
|
|
ou.process(ctx, item)
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// process loads the given image, converts it to JPEG, and uploads it to
|
|
// Flamenco Manager.
|
|
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
|
|
logger := log.With().
|
|
Str("image", item.Filename).
|
|
Str("task", item.TaskID).
|
|
Logger()
|
|
logger.Info().Msg("output uploader: processing file before uploading to Manager")
|
|
|
|
jpegBytes := loadAsJPEG(item.Filename)
|
|
if len(jpegBytes) == 0 {
|
|
return // loadAsJPEG() already logged the error.
|
|
}
|
|
|
|
// Upload to Manager.
|
|
jpegReader := bytes.NewReader(jpegBytes)
|
|
resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
|
|
ctx, item.TaskID, "image/jpeg", jpegReader)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
|
|
return
|
|
}
|
|
|
|
// Handle the Manager response:
|
|
switch {
|
|
case resp.StatusCode() == http.StatusAccepted:
|
|
logger.Info().Msg("output uploader: Manager accepted our image")
|
|
case resp.JSON411 != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSON411.Message).
|
|
Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker")
|
|
case resp.JSON413 != nil:
|
|
logger.Warn().
|
|
Str("message", resp.JSON413.Message).
|
|
Msg("output uploader: Manager rejected our upload, it is too large")
|
|
case resp.JSON415 != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSON415.Message).
|
|
Msg("output uploader: Manager rejected our upload, unsupported file type")
|
|
case resp.JSON429 != nil:
|
|
logger.Warn().
|
|
Str("message", resp.JSON429.Message).
|
|
Msg("output uploader: Manager is too busy to handle this upload")
|
|
case resp.JSONDefault != nil:
|
|
logger.Error().
|
|
Str("message", resp.JSONDefault.Message).
|
|
Msg("output uploader: error from Manager")
|
|
default:
|
|
logger.Error().
|
|
Str("httpStatus", resp.Status()).
|
|
Msg("output uploader: unexpected error from Manager")
|
|
}
|
|
}
|
|
|
|
func loadAsJPEG(imagePath string) []byte {
|
|
logger := log.With().Str("image", imagePath).Logger()
|
|
|
|
// Open the output file.
|
|
file, err := os.Open(imagePath)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: error opening file")
|
|
return nil
|
|
}
|
|
defer file.Close()
|
|
|
|
// Try to decode the file as image.
|
|
img, fileType, err := image.Decode(file)
|
|
switch {
|
|
case errors.Is(err, image.ErrFormat):
|
|
// Blender writing formats not supported by this Go code will happen, for
|
|
// example EXR files. This is fine, and shouldn't even trigger a warning.
|
|
logger.Info().Msg("output uploader: file a format I cannot decode, ignoring it")
|
|
return nil
|
|
case err != nil:
|
|
// Any other error than the above is more serious, though, because that
|
|
// could mean file corruption.
|
|
logger.Error().Err(err).Msg("output uploader: cannot decode image file")
|
|
return nil
|
|
}
|
|
logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded")
|
|
|
|
// Compress the image as JPEG.
|
|
jpegBuffer := bytes.Buffer{}
|
|
options := jpeg.Options{
|
|
Quality: thumbnailJPEGQuality,
|
|
}
|
|
err = jpeg.Encode(&jpegBuffer, img, &options)
|
|
if err != nil {
|
|
logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG")
|
|
return nil
|
|
}
|
|
|
|
return jpegBuffer.Bytes()
|
|
}
|