flamenco/internal/worker/output_uploader.go
Sybren A. Stüvel 02fac6a4df Change Go package name from git.blender.org to projects.blender.org
Change the package base name of the Go code, from
`git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`.

The old location, `git.blender.org`, has no longer been use since the
[migration to Gitea][1]. The new package names now reflect the actual
location where Flamenco is hosted.

[1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
2023-08-01 12:42:31 +02:00

172 lines
4.7 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"context"
"errors"
"image"
"image/jpeg"
_ "image/png"
"net/http"
"os"
"sync"
"github.com/rs/zerolog/log"
_ "golang.org/x/image/tiff"
"projects.blender.org/studio/flamenco/pkg/last_in_one_out_queue"
)
const thumbnailJPEGQuality = 85
// OutputUploader sends (downscaled versions of) rendered images to Flamenco
// Manager. Only one image is sent at a time. A queue of a single image is kept,
// where newly queued images replace older ones.
type OutputUploader struct {
client FlamencoClient
queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
}
type TaskOutput struct {
TaskID string
Filename string
}
func NewOutputUploader(client FlamencoClient) *OutputUploader {
return &OutputUploader{
client: client,
queue: last_in_one_out_queue.New[TaskOutput](),
}
}
// OutputProduced enqueues the given filename for processing.
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
// TODO: Before enqueueing (and thus overwriting any previously queued item),
// check that this file can actually be handled by the Last Rendered system of
// Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
// the queue by an EXR file we can't handle.
item := TaskOutput{taskID, filename}
ou.queue.Enqueue(item)
}
func (ou *OutputUploader) Run(ctx context.Context) {
log.Info().Msg("output uploader: running")
defer log.Info().Msg("output uploader: shutting down")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ou.queue.Run(ctx)
}()
runLoop:
for {
select {
case <-ctx.Done():
break runLoop
case item := <-ou.queue.Item():
ou.process(ctx, item)
}
}
wg.Wait()
}
// process loads the given image, converts it to JPEG, and uploads it to
// Flamenco Manager.
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
logger := log.With().
Str("image", item.Filename).
Str("task", item.TaskID).
Logger()
logger.Info().Msg("output uploader: processing file before uploading to Manager")
jpegBytes := loadAsJPEG(item.Filename)
if len(jpegBytes) == 0 {
return // loadAsJPEG() already logged the error.
}
// Upload to Manager.
jpegReader := bytes.NewReader(jpegBytes)
resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
ctx, item.TaskID, "image/jpeg", jpegReader)
if err != nil {
logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
return
}
// Handle the Manager response:
switch {
case resp.StatusCode() == http.StatusAccepted:
logger.Info().Msg("output uploader: Manager accepted our image")
case resp.JSON411 != nil:
logger.Error().
Str("message", resp.JSON411.Message).
Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker")
case resp.JSON413 != nil:
logger.Warn().
Str("message", resp.JSON413.Message).
Msg("output uploader: Manager rejected our upload, it is too large")
case resp.JSON415 != nil:
logger.Error().
Str("message", resp.JSON415.Message).
Msg("output uploader: Manager rejected our upload, unsupported file type")
case resp.JSON429 != nil:
logger.Warn().
Str("message", resp.JSON429.Message).
Msg("output uploader: Manager is too busy to handle this upload")
case resp.JSONDefault != nil:
logger.Error().
Str("message", resp.JSONDefault.Message).
Msg("output uploader: error from Manager")
default:
logger.Error().
Str("httpStatus", resp.Status()).
Msg("output uploader: unexpected error from Manager")
}
}
func loadAsJPEG(imagePath string) []byte {
logger := log.With().Str("image", imagePath).Logger()
// Open the output file.
file, err := os.Open(imagePath)
if err != nil {
logger.Error().Err(err).Msg("output uploader: error opening file")
return nil
}
defer file.Close()
// Try to decode the file as image.
img, fileType, err := image.Decode(file)
switch {
case errors.Is(err, image.ErrFormat):
// Blender writing formats not supported by this Go code will happen, for
// example EXR files. This is fine, and shouldn't even trigger a warning.
logger.Info().Msg("output uploader: file a format I cannot decode, ignoring it")
return nil
case err != nil:
// Any other error than the above is more serious, though, because that
// could mean file corruption.
logger.Error().Err(err).Msg("output uploader: cannot decode image file")
return nil
}
logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded")
// Compress the image as JPEG.
jpegBuffer := bytes.Buffer{}
options := jpeg.Options{
Quality: thumbnailJPEGQuality,
}
err = jpeg.Encode(&jpegBuffer, img, &options)
if err != nil {
logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG")
return nil
}
return jpegBuffer.Bytes()
}