Worker: log chunker, also flush log after certain time

This flushes the log when the previous `Append()` call was too long ago.
Note that this doesn't flush after X seconds of silence; a call to
`Append()` or `Flush()` still has to happen in order to do the flushing.
This commit is contained in:
Sybren A. Stüvel 2022-02-22 15:34:37 +01:00
parent 45a95ca4c2
commit adf7113b34
3 changed files with 85 additions and 16 deletions

View File

@ -73,7 +73,7 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
logger = logger.With().Int("pid", blenderPID).Logger() logger = logger.With().Int("pid", blenderPID).Logger()
reader := bufio.NewReaderSize(outPipe, StdoutBufferSize) reader := bufio.NewReaderSize(outPipe, StdoutBufferSize)
logChunker := NewLogChunker(taskID, ce.listener) logChunker := NewLogChunker(taskID, ce.listener, ce.timeService)
for { for {
lineBytes, isPrefix, readErr := reader.ReadLine() lineBytes, isPrefix, readErr := reader.ReadLine()

View File

@ -23,25 +23,42 @@ package worker
import ( import (
"context" "context"
"strings" "strings"
"time"
) )
// When the buffer grows beyond this many bytes, flush.
const defaultLogChunkerBufferFlushSize = 1024 const defaultLogChunkerBufferFlushSize = 1024
// When the last flush was this long ago, flush.
const defaultLogChunkerFlushMaxInterval = 30 * time.Second
// LogChunker gathers log lines in memory and sends them to a CommandListener. // LogChunker gathers log lines in memory and sends them to a CommandListener.
// NOTE: LogChunker is not thread-safe. // NOTE: LogChunker is not thread-safe.
type LogChunker struct { type LogChunker struct {
taskID string taskID string
listener CommandListener
listener CommandListener
timeService TimeService
buffer strings.Builder buffer strings.Builder
bufferFlushSize int // When the buffer grows beyond this many bytes, flush. bufferFlushSize int // When the buffer grows beyond this many bytes, flush.
lastFlush time.Time
flushAfter time.Duration
} }
func NewLogChunker(taskID string, listerer CommandListener) *LogChunker { func NewLogChunker(taskID string, listerer CommandListener, timeService TimeService) *LogChunker {
return &LogChunker{ return &LogChunker{
taskID: taskID, taskID: taskID,
listener: listerer,
listener: listerer,
timeService: timeService,
buffer: strings.Builder{}, buffer: strings.Builder{},
bufferFlushSize: defaultLogChunkerBufferFlushSize, bufferFlushSize: defaultLogChunkerBufferFlushSize,
lastFlush: timeService.Now(),
flushAfter: defaultLogChunkerFlushMaxInterval,
} }
} }
@ -53,18 +70,32 @@ func (lc *LogChunker) Flush(ctx context.Context) error {
err := lc.listener.LogProduced(ctx, lc.taskID, lc.buffer.String()) err := lc.listener.LogProduced(ctx, lc.taskID, lc.buffer.String())
lc.buffer.Reset() lc.buffer.Reset()
lc.lastFlush = time.Now()
return err return err
} }
// Append log lines to the buffer, sending to the listener when the buffer gets too large.
func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error { func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error {
for idx := range logLines { for idx := range logLines {
lc.buffer.WriteString(logLines[idx]) lc.buffer.WriteString(logLines[idx])
lc.buffer.WriteByte('\n') lc.buffer.WriteByte('\n')
} }
if lc.buffer.Len() > lc.bufferFlushSize { if lc.shouldFlush() {
return lc.Flush(ctx) return lc.Flush(ctx)
} }
return nil return nil
} }
func (lc *LogChunker) shouldFlush() bool {
if lc.buffer.Len() > lc.bufferFlushSize {
return true
}
if lc.timeService.Now().Sub(lc.lastFlush) > lc.flushAfter {
return true
}
return false
}

View File

@ -3,7 +3,9 @@ package worker
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks" "gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks"
@ -29,12 +31,24 @@ import (
* *
* ***** END GPL LICENSE BLOCK ***** */ * ***** END GPL LICENSE BLOCK ***** */
type LogChunkerMocks struct {
listener *mocks.MockCommandListener
clock *clock.Mock
}
func mockedLogChunker(t *testing.T, mockCtrl *gomock.Controller) (*LogChunker, *LogChunkerMocks) {
mocks := LogChunkerMocks{
clock: mockedClock(t),
listener: mocks.NewMockCommandListener(mockCtrl),
}
lc := NewLogChunker("taskID", mocks.listener, mocks.clock)
return lc, &mocks
}
func TestLogChunkerEmpty(t *testing.T) { func TestLogChunkerEmpty(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
lc, _ := mockedLogChunker(t, mockCtrl)
mockListener := mocks.NewMockCommandListener(mockCtrl)
lc := NewLogChunker("taskID", mockListener)
// Note: no call to mockListener is expected. // Note: no call to mockListener is expected.
err := lc.Flush(context.Background()) err := lc.Flush(context.Background())
@ -47,11 +61,10 @@ func TestLogChunkerSimple(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
mockListener := mocks.NewMockCommandListener(mockCtrl) lc, mocks := mockedLogChunker(t, mockCtrl)
lc := NewLogChunker("taskID", mockListener)
ctx := context.Background() ctx := context.Background()
mockListener.EXPECT().LogProduced(ctx, "taskID", "just one line\n") mocks.listener.EXPECT().LogProduced(ctx, "taskID", "just one line\n")
err := lc.Append(ctx, "just one line") err := lc.Append(ctx, "just one line")
assert.NoError(t, err) assert.NoError(t, err)
@ -65,8 +78,7 @@ func TestLogChunkerMuchLogging(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
mockListener := mocks.NewMockCommandListener(mockCtrl) lc, mocks := mockedLogChunker(t, mockCtrl)
lc := NewLogChunker("taskID", mockListener)
lc.bufferFlushSize = 12 lc.bufferFlushSize = 12
ctx := context.Background() ctx := context.Background()
@ -74,10 +86,36 @@ func TestLogChunkerMuchLogging(t *testing.T) {
err := lc.Append(ctx, "één regel") // 9 runes, 11 bytes, 12 with newline, within buffer size. err := lc.Append(ctx, "één regel") // 9 runes, 11 bytes, 12 with newline, within buffer size.
assert.NoError(t, err) assert.NoError(t, err)
mockListener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n") mocks.listener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n")
err = lc.Append(ctx, "second line") // this pushes the buffer over its max size. err = lc.Append(ctx, "second line") // this pushes the buffer over its max size.
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty") assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty")
} }
func TestLogChunkerTimedFlush(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
lc, mocks := mockedLogChunker(t, mockCtrl)
lc.flushAfter = 2 * time.Second
ctx := context.Background()
err := lc.Append(ctx, "één regel") // No flush yet
assert.NoError(t, err)
mocks.clock.Add(2000 * time.Millisecond) // Exactly the threshold
err = lc.Append(ctx, "second line") // No flush yet
assert.NoError(t, err)
mocks.clock.Add(1 * time.Millisecond) // Juuuuust a bit longer than the threshold.
mocks.listener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\nthird line\n")
err = lc.Append(ctx, "third line") // This should flush due to the long wait.
assert.NoError(t, err)
assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty")
}