diff --git a/internal/worker/command_blender.go b/internal/worker/command_blender.go index e04d62be..02683a29 100644 --- a/internal/worker/command_blender.go +++ b/internal/worker/command_blender.go @@ -73,7 +73,7 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog. logger = logger.With().Int("pid", blenderPID).Logger() reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) - logChunker := NewLogChunker(taskID, ce.listener) + logChunker := NewLogChunker(taskID, ce.listener, ce.timeService) for { lineBytes, isPrefix, readErr := reader.ReadLine() diff --git a/internal/worker/log_chunker.go b/internal/worker/log_chunker.go index 70837776..4b918183 100644 --- a/internal/worker/log_chunker.go +++ b/internal/worker/log_chunker.go @@ -23,25 +23,42 @@ package worker import ( "context" "strings" + "time" ) +// When the buffer grows beyond this many bytes, flush. const defaultLogChunkerBufferFlushSize = 1024 +// When the last flush was this long ago, flush. +const defaultLogChunkerFlushMaxInterval = 30 * time.Second + // LogChunker gathers log lines in memory and sends them to a CommandListener. // NOTE: LogChunker is not thread-safe. type LogChunker struct { - taskID string - listener CommandListener + taskID string + + listener CommandListener + timeService TimeService + buffer strings.Builder bufferFlushSize int // When the buffer grows beyond this many bytes, flush. + + lastFlush time.Time + flushAfter time.Duration } -func NewLogChunker(taskID string, listerer CommandListener) *LogChunker { +func NewLogChunker(taskID string, listerer CommandListener, timeService TimeService) *LogChunker { return &LogChunker{ - taskID: taskID, - listener: listerer, + taskID: taskID, + + listener: listerer, + timeService: timeService, + buffer: strings.Builder{}, bufferFlushSize: defaultLogChunkerBufferFlushSize, + + lastFlush: timeService.Now(), + flushAfter: defaultLogChunkerFlushMaxInterval, } } @@ -53,18 +70,32 @@ func (lc *LogChunker) Flush(ctx context.Context) error { err := lc.listener.LogProduced(ctx, lc.taskID, lc.buffer.String()) lc.buffer.Reset() + lc.lastFlush = time.Now() return err } +// Append log lines to the buffer, sending to the listener when the buffer gets too large. func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error { for idx := range logLines { lc.buffer.WriteString(logLines[idx]) lc.buffer.WriteByte('\n') } - if lc.buffer.Len() > lc.bufferFlushSize { + if lc.shouldFlush() { return lc.Flush(ctx) } return nil } + +func (lc *LogChunker) shouldFlush() bool { + if lc.buffer.Len() > lc.bufferFlushSize { + return true + } + + if lc.timeService.Now().Sub(lc.lastFlush) > lc.flushAfter { + return true + } + + return false +} diff --git a/internal/worker/log_chunker_test.go b/internal/worker/log_chunker_test.go index 3023716f..d2cba7ee 100644 --- a/internal/worker/log_chunker_test.go +++ b/internal/worker/log_chunker_test.go @@ -3,7 +3,9 @@ package worker import ( "context" "testing" + "time" + "github.com/benbjohnson/clock" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks" @@ -29,12 +31,24 @@ import ( * * ***** END GPL LICENSE BLOCK ***** */ +type LogChunkerMocks struct { + listener *mocks.MockCommandListener + clock *clock.Mock +} + +func mockedLogChunker(t *testing.T, mockCtrl *gomock.Controller) (*LogChunker, *LogChunkerMocks) { + mocks := LogChunkerMocks{ + clock: mockedClock(t), + listener: mocks.NewMockCommandListener(mockCtrl), + } + lc := NewLogChunker("taskID", mocks.listener, mocks.clock) + return lc, &mocks +} + func TestLogChunkerEmpty(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - - mockListener := mocks.NewMockCommandListener(mockCtrl) - lc := NewLogChunker("taskID", mockListener) + lc, _ := mockedLogChunker(t, mockCtrl) // Note: no call to mockListener is expected. err := lc.Flush(context.Background()) @@ -47,11 +61,10 @@ func TestLogChunkerSimple(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockListener := mocks.NewMockCommandListener(mockCtrl) - lc := NewLogChunker("taskID", mockListener) + lc, mocks := mockedLogChunker(t, mockCtrl) ctx := context.Background() - mockListener.EXPECT().LogProduced(ctx, "taskID", "just one line\n") + mocks.listener.EXPECT().LogProduced(ctx, "taskID", "just one line\n") err := lc.Append(ctx, "just one line") assert.NoError(t, err) @@ -65,8 +78,7 @@ func TestLogChunkerMuchLogging(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockListener := mocks.NewMockCommandListener(mockCtrl) - lc := NewLogChunker("taskID", mockListener) + lc, mocks := mockedLogChunker(t, mockCtrl) lc.bufferFlushSize = 12 ctx := context.Background() @@ -74,10 +86,36 @@ func TestLogChunkerMuchLogging(t *testing.T) { err := lc.Append(ctx, "één regel") // 9 runes, 11 bytes, 12 with newline, within buffer size. assert.NoError(t, err) - mockListener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n") + mocks.listener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n") err = lc.Append(ctx, "second line") // this pushes the buffer over its max size. assert.NoError(t, err) assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") } + +func TestLogChunkerTimedFlush(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + lc, mocks := mockedLogChunker(t, mockCtrl) + lc.flushAfter = 2 * time.Second + + ctx := context.Background() + + err := lc.Append(ctx, "één regel") // No flush yet + assert.NoError(t, err) + + mocks.clock.Add(2000 * time.Millisecond) // Exactly the threshold + err = lc.Append(ctx, "second line") // No flush yet + assert.NoError(t, err) + + mocks.clock.Add(1 * time.Millisecond) // Juuuuust a bit longer than the threshold. + + mocks.listener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\nthird line\n") + + err = lc.Append(ctx, "third line") // This should flush due to the long wait. + assert.NoError(t, err) + + assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") +}