diff --git a/internal/worker/listener.go b/internal/worker/listener.go index 53c19c96..3cee8b4b 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -8,7 +8,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/rs/zerolog/log" @@ -24,9 +23,10 @@ var ( // Listener listens to the result of task and command execution, and sends it to the Manager. type Listener struct { - doneWg *sync.WaitGroup - client FlamencoClient - buffer UpstreamBuffer + doneWg *sync.WaitGroup + client FlamencoClient + buffer UpstreamBuffer + outputUploader *OutputUploader } // UpstreamBuffer can buffer up-stream task updates, in case the Manager cannot be reached. @@ -37,9 +37,10 @@ type UpstreamBuffer interface { // NewListener creates a new Listener that will send updates to the API client. func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener { l := &Listener{ - doneWg: new(sync.WaitGroup), - client: client, - buffer: buffer, + doneWg: new(sync.WaitGroup), + client: client, + buffer: buffer, + outputUploader: NewOutputUploader(client), } l.doneWg.Add(1) return l @@ -49,14 +50,8 @@ func (l *Listener) Run(ctx context.Context) { defer l.doneWg.Done() defer log.Debug().Msg("listener shutting down") - for { - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Second): - log.Trace().Msg("listener is still running") - } - } + log.Debug().Msg("listener starting up") + l.outputUploader.Run(ctx) } func (l *Listener) Wait() { @@ -103,7 +98,8 @@ func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...s // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error { - return fmt.Errorf("Listener.OutputProduced(%q, %q): not implemented yet", taskID, outputLocation) + l.outputUploader.OutputProduced(taskID, outputLocation) + return nil } func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { diff --git a/internal/worker/output_uploader.go b/internal/worker/output_uploader.go new file mode 100644 index 00000000..9a285b68 --- /dev/null +++ b/internal/worker/output_uploader.go @@ -0,0 +1,161 @@ +package worker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "bytes" + "context" + "image" + "image/jpeg" + _ "image/png" + "net/http" + "os" + "sync" + + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/pkg/last_in_one_out_queue" +) + +const thumbnailJPEGQuality = 80 + +// OutputUploader sends (downscaled versions of) rendered images to Flamenco +// Manager. Only one image is sent at a time. A queue of a single image is kept, +// where newly queued images replace older ones. +type OutputUploader struct { + client FlamencoClient + queue *last_in_one_out_queue.LastInOneOutQueue[TaskOutput] +} + +type TaskOutput struct { + TaskID string + Filename string +} + +func NewOutputUploader(client FlamencoClient) *OutputUploader { + return &OutputUploader{ + client: client, + queue: last_in_one_out_queue.New[TaskOutput](), + } +} + +// OutputProduced enqueues the given filename for processing. +func (ou *OutputUploader) OutputProduced(taskID, filename string) { + // TODO: Before enqueueing (and thus overwriting any previously queued item), + // check that this file can actually be handled by the Last Rendered system of + // Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off + // the queue by an EXR file we can't handle. + item := TaskOutput{taskID, filename} + ou.queue.Enqueue(item) +} + +func (ou *OutputUploader) Run(ctx context.Context) { + log.Info().Msg("output uploader: running") + defer log.Info().Msg("output uploader: shutting down") + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ou.queue.Run(ctx) + }() + +runLoop: + for { + select { + case <-ctx.Done(): + break runLoop + case item := <-ou.queue.Item(): + ou.process(ctx, item) + } + } + + wg.Wait() +} + +// process loads the given image, converts it to JPEG, and uploads it to +// Flamenco Manager. +func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) { + logger := log.With(). + Str("image", item.Filename). + Str("task", item.TaskID). + Logger() + logger.Info().Msg("output uploader: processing file before uploading to Manager") + + jpegBytes := loadAsJPEG(item.Filename) + if len(jpegBytes) == 0 { + return // loadAsJPEG() already logged the error. + } + + // Upload to Manager. + jpegReader := bytes.NewReader(jpegBytes) + resp, err := ou.client.TaskOutputProducedWithBodyWithResponse( + ctx, item.TaskID, "image/jpeg", jpegReader) + if err != nil { + logger.Error().Err(err).Msg("output uploader: unable to send image to Manager") + return + } + + // Handle the Manager response: + switch { + case resp.StatusCode() == http.StatusAccepted: + logger.Info().Msg("output uploader: Manager accepted our image") + case resp.JSON411 != nil: + logger.Error(). + Str("message", resp.JSON411.Message). + Msg("output uploader: Manager rejected our request, this is a bug in Flamenco Worker") + case resp.JSON413 != nil: + logger.Warn(). + Str("message", resp.JSON413.Message). + Msg("output uploader: Manager rejected our upload, it is too large") + case resp.JSON415 != nil: + logger.Error(). + Str("message", resp.JSON415.Message). + Msg("output uploader: Manager rejected our upload, unsupported file type") + case resp.JSON429 != nil: + logger.Warn(). + Str("message", resp.JSON429.Message). + Msg("output uploader: Manager is too busy to handle this upload") + case resp.JSONDefault != nil: + logger.Error(). + Str("message", resp.JSONDefault.Message). + Msg("output uploader: error from Manager") + default: + logger.Error(). + Str("httpStatus", resp.Status()). + Msg("output uploader: unexpected error from Manager") + } +} + +func loadAsJPEG(imagePath string) []byte { + logger := log.With().Str("image", imagePath).Logger() + + // Open the output file. + file, err := os.Open(imagePath) + if err != nil { + logger.Error().Err(err).Msg("output uploader: error opening file") + return nil + } + defer file.Close() + + // Try to decode the file as image. + img, fileType, err := image.Decode(file) + if err != nil { + logger.Error().Err(err).Msg("output uploader: cannot decode image file") + return nil + } + logger.Debug().Str("type", fileType).Msg("output uploaded: image decoded") + + // Compress the image as JPEG. + jpegBuffer := bytes.Buffer{} + options := jpeg.Options{ + Quality: thumbnailJPEGQuality, + } + err = jpeg.Encode(&jpegBuffer, img, &options) + if err != nil { + logger.Error().Err(err).Msg("output uploader: cannot encode image as JPEG") + return nil + } + + return jpegBuffer.Bytes() +} diff --git a/internal/worker/output_uploader_test.go b/internal/worker/output_uploader_test.go new file mode 100644 index 00000000..dacb61e0 --- /dev/null +++ b/internal/worker/output_uploader_test.go @@ -0,0 +1,118 @@ +package worker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "net/http" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "git.blender.org/flamenco/internal/worker/mocks" + "git.blender.org/flamenco/pkg/api" +) + +func TestQueueOutput(t *testing.T) { + ou, mocks, finish := mockedOutputUploader(t) + defer finish() + + taskID := "094d98ba-d6e2-4765-a10b-70533604a952" + + // Run the queue process, otherwise the queued item cannot be received. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ou.Run(mocks.ctx) + }() + + ou.OutputProduced(taskID, "filename.jpg") + + // The output should be queued for processing now. + select { + case item := <-ou.queue.Item(): + assert.Equal(t, item.Filename, "filename.jpg") + assert.Equal(t, item.TaskID, taskID) + case <-time.After(1 * time.Second): + t.Fatal("output should be queued for processing") + } + + mocks.ctxCancel() + wg.Wait() +} + +func TestProcess(t *testing.T) { + ou, mocks, finish := mockedOutputUploader(t) + defer finish() + + taskID := "094d98ba-d6e2-4765-a10b-70533604a952" + filename := "command_ffmpeg_test_files/frame-1.png" + + item := TaskOutput{ + TaskID: taskID, + Filename: filename, + } + + { + // Test happy response from Manager. + response := api.TaskOutputProducedResponse{ + HTTPResponse: &http.Response{ + Status: "202 Accepted", + StatusCode: http.StatusAccepted, + }, + } + mocks.client.EXPECT().TaskOutputProducedWithBodyWithResponse( + mocks.ctx, taskID, "image/jpeg", gomock.Any()). + Return(&response, nil) + + ou.process(mocks.ctx, item) + } + + { + // Test unhappy response from Manager (its queue is full). + // The only difference with the happy flow is in the logging, so that's hard + // to assert for here. It's a different flow though, with a different + // non-nil pointer, so it's good to at least check it doesn't cause any + // panics. + response := api.TaskOutputProducedResponse{ + JSON429: &api.Error{ + Code: http.StatusTooManyRequests, + Message: "enhance your calm", + }, + } + mocks.client.EXPECT().TaskOutputProducedWithBodyWithResponse( + mocks.ctx, taskID, "image/jpeg", gomock.Any()). + Return(&response, nil) + + ou.process(mocks.ctx, item) + } + +} + +type outputUploaderTestMocks struct { + client *mocks.MockFlamencoClient + ctx context.Context + ctxCancel context.CancelFunc +} + +func mockedOutputUploader(t *testing.T) (*OutputUploader, *outputUploaderTestMocks, func()) { + mockCtrl := gomock.NewController(t) + + ctx, cancel := context.WithCancel(context.Background()) + mocks := outputUploaderTestMocks{ + client: mocks.NewMockFlamencoClient(mockCtrl), + ctx: ctx, + ctxCancel: cancel, + } + ou := NewOutputUploader(mocks.client) + + finish := func() { + cancel() + mockCtrl.Finish() + } + return ou, &mocks, finish +}