flamenco/internal/worker/output_uploader.go
Sybren A. Stüvel 94fba20ef6 Worker: reduce log level of some internal components
Reduce the log level from 'info' to 'debug' on some internal components
of Flamenco Worker. This makes the console output slightly less noisy,
and it's unlikely that these particular messages are commonly needed.
2024-04-16 10:53:29 +02:00

172 lines
4.7 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"context"
"errors"
"image"
"image/jpeg"
_ "image/png"
"net/http"
"os"
"sync"
"github.com/rs/zerolog/log"
_ "golang.org/x/image/tiff"
"projects.blender.org/studio/flamenco/pkg/last_in_one_out_queue"
)
const thumbnailJPEGQuality = 85
// OutputUploader sends (downscaled versions of) rendered images to Flamenco
// Manager. Only one image is sent at a time. A queue of a single image is kept,
// where newly queued images replace older ones.
type OutputUploader struct {
client FlamencoClient
queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
}
type TaskOutput struct {
TaskID string
Filename string
}
func NewOutputUploader(client FlamencoClient) *OutputUploader {
return &OutputUploader{
client: client,
queue: last_in_one_out_queue.New[TaskOutput](),
}
}
// OutputProduced enqueues the given filename for processing.
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
// TODO: Before enqueueing (and thus overwriting any previously queued item),
// check that this file can actually be handled by the Last Rendered system of
// Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
// the queue by an EXR file we can't handle.
item := TaskOutput{taskID, filename}
ou.queue.Enqueue(item)
}
func (ou *OutputUploader) Run(ctx context.Context) {
log.Debug().Msg("output uploader: running")
defer log.Debug().Msg("output uploader: shutting down")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ou.queue.Run(ctx)
}()
runLoop:
for {
select {
case <-ctx.Done():
break runLoop
case item := <-ou.queue.Item():
ou.process(ctx, item)
}
}
wg.Wait()
}
// process loads the given image, converts it to JPEG, and uploads it to
// Flamenco Manager.
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
logger := log.With().
Str("image", item.Filename).
Str("task", item.TaskID).
Logger()
logger.Info().Msg("output uploader: processing file before uploading to Manager")
jpegBytes := loadAsJPEG(item.Filename)
if len(jpegBytes) == 0 {
return // loadAsJPEG() already logged the error.
}
// Upload to Manager.
jpegReader := bytes.NewReader(jpegBytes)
resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
ctx, item.TaskID, "image/jpeg", jpegReader)
if err != nil {
logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
return
}
// Handle the Manager response:
switch {
case resp.StatusCode() == http.StatusAccepted:
logger.Info().Msg("output uploader: Manager accepted our image")
case resp.JSON411 != nil:
logger.Error().
Str("message", resp.JSON411.Message).
Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker")
case resp.JSON413 != nil:
logger.Warn().
Str("message", resp.JSON413.Message).
Msg("output uploader: Manager rejected our upload, it is too large")
case resp.JSON415 != nil:
logger.Error().
Str("message", resp.JSON415.Message).
Msg("output uploader: Manager rejected our upload, unsupported file type")
case resp.JSON429 != nil:
logger.Warn().
Str("message", resp.JSON429.Message).
Msg("output uploader: Manager is too busy to handle this upload")
case resp.JSONDefault != nil:
logger.Error().
Str("message", resp.JSONDefault.Message).
Msg("output uploader: error from Manager")
default:
logger.Error().
Str("httpStatus", resp.Status()).
Msg("output uploader: unexpected error from Manager")
}
}
func loadAsJPEG(imagePath string) []byte {
logger := log.With().Str("image", imagePath).Logger()
// Open the output file.
file, err := os.Open(imagePath)
if err != nil {
logger.Error().Err(err).Msg("output uploader: error opening file")
return nil
}
defer file.Close()
// Try to decode the file as image.
img, fileType, err := image.Decode(file)
switch {
case errors.Is(err, image.ErrFormat):
// Blender writing formats not supported by this Go code will happen, for
// example EXR files. This is fine, and shouldn't even trigger a warning.
logger.Info().Msg("output uploader: file a format I cannot decode, ignoring it")
return nil
case err != nil:
// Any other error than the above is more serious, though, because that
// could mean file corruption.
logger.Error().Err(err).Msg("output uploader: cannot decode image file")
return nil
}
logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded")
// Compress the image as JPEG.
jpegBuffer := bytes.Buffer{}
options := jpeg.Options{
Quality: thumbnailJPEGQuality,
}
err = jpeg.Encode(&jpegBuffer, img, &options)
if err != nil {
logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG")
return nil
}
return jpegBuffer.Bytes()
}