Worker: upload logs in chunks to Manager
This commit is contained in:
parent
9a5047a94d
commit
66186e460e
@ -28,6 +28,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/google/shlex"
|
||||
"github.com/rs/zerolog"
|
||||
@ -38,6 +39,8 @@ import (
|
||||
// Effectively this determines the maximum line length that can be handled.
|
||||
const StdoutBufferSize = 40 * 1024
|
||||
|
||||
const timeFormat = time.RFC3339Nano
|
||||
|
||||
type BlenderParameters struct {
|
||||
exe string // Expansion of `{blender}`: executable path + its CLI parameters defined by the Manager.
|
||||
argsBefore []string // Additional CLI arguments defined by the job compiler script, to go before the blend file name.
|
||||
@ -66,7 +69,11 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
|
||||
return err
|
||||
}
|
||||
|
||||
blenderPID := execCmd.Process.Pid
|
||||
logger = logger.With().Int("pid", blenderPID).Logger()
|
||||
|
||||
reader := bufio.NewReaderSize(outPipe, StdoutBufferSize)
|
||||
logChunker := NewLogChunker(taskID, ce.listener)
|
||||
|
||||
for {
|
||||
lineBytes, isPrefix, readErr := reader.ReadLine()
|
||||
@ -86,11 +93,16 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
|
||||
}
|
||||
|
||||
logger.Debug().Msg(line)
|
||||
// TODO: don't send the log line-by-line, but send in chunks.
|
||||
if err := ce.listener.LogProduced(ctx, taskID, line); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
timestamp := time.Now().Format(timeFormat)
|
||||
// %35s because trailing zeroes in the nanoseconds aren't output by the
|
||||
// formatted timestamp, and thus it has a variable length. Using a fixed
|
||||
// width in this Sprintf() call ensures the rest of the line aligns visually
|
||||
// with the preceeding ones.
|
||||
logLine := fmt.Sprintf("%35s: pid=%d > %s", timestamp, blenderPID, line)
|
||||
logChunker.Append(ctx, logLine)
|
||||
}
|
||||
logChunker.Flush(ctx)
|
||||
|
||||
if err := execCmd.Wait(); err != nil {
|
||||
logger.Error().Err(err).Msg("error in CLI execution")
|
||||
|
70
internal/worker/log_chunker.go
Normal file
70
internal/worker/log_chunker.go
Normal file
@ -0,0 +1,70 @@
|
||||
package worker
|
||||
|
||||
/* ***** BEGIN GPL LICENSE BLOCK *****
|
||||
*
|
||||
* Original Code Copyright (C) 2022 Blender Foundation.
|
||||
*
|
||||
* This file is part of Flamenco.
|
||||
*
|
||||
* Flamenco is free software: you can redistribute it and/or modify it under
|
||||
* the terms of the GNU General Public License as published by the Free Software
|
||||
* Foundation, either version 3 of the License, or (at your option) any later
|
||||
* version.
|
||||
*
|
||||
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
* ***** END GPL LICENSE BLOCK ***** */
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const defaultLogChunkerBufferFlushSize = 1024
|
||||
|
||||
// LogChunker gathers log lines in memory and sends them to a CommandListener.
|
||||
// NOTE: LogChunker is not thread-safe.
|
||||
type LogChunker struct {
|
||||
taskID string
|
||||
listener CommandListener
|
||||
buffer strings.Builder
|
||||
bufferFlushSize int // When the buffer grows beyond this many bytes, flush.
|
||||
}
|
||||
|
||||
func NewLogChunker(taskID string, listerer CommandListener) *LogChunker {
|
||||
return &LogChunker{
|
||||
taskID: taskID,
|
||||
listener: listerer,
|
||||
buffer: strings.Builder{},
|
||||
bufferFlushSize: defaultLogChunkerBufferFlushSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Flush sends any buffered logs to the listener.
|
||||
func (lc *LogChunker) Flush(ctx context.Context) error {
|
||||
if lc.buffer.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := lc.listener.LogProduced(ctx, lc.taskID, lc.buffer.String())
|
||||
lc.buffer.Reset()
|
||||
return err
|
||||
}
|
||||
|
||||
func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error {
|
||||
for idx := range logLines {
|
||||
lc.buffer.WriteString(logLines[idx])
|
||||
lc.buffer.WriteByte('\n')
|
||||
}
|
||||
|
||||
if lc.buffer.Len() > lc.bufferFlushSize {
|
||||
return lc.Flush(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
83
internal/worker/log_chunker_test.go
Normal file
83
internal/worker/log_chunker_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks"
|
||||
)
|
||||
|
||||
/* ***** BEGIN GPL LICENSE BLOCK *****
|
||||
*
|
||||
* Original Code Copyright (C) 2022 Blender Foundation.
|
||||
*
|
||||
* This file is part of Flamenco.
|
||||
*
|
||||
* Flamenco is free software: you can redistribute it and/or modify it under
|
||||
* the terms of the GNU General Public License as published by the Free Software
|
||||
* Foundation, either version 3 of the License, or (at your option) any later
|
||||
* version.
|
||||
*
|
||||
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
* ***** END GPL LICENSE BLOCK ***** */
|
||||
|
||||
func TestLogChunkerEmpty(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockListener := mocks.NewMockCommandListener(mockCtrl)
|
||||
lc := NewLogChunker("taskID", mockListener)
|
||||
|
||||
// Note: no call to mockListener is expected.
|
||||
err := lc.Flush(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty")
|
||||
}
|
||||
|
||||
func TestLogChunkerSimple(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockListener := mocks.NewMockCommandListener(mockCtrl)
|
||||
lc := NewLogChunker("taskID", mockListener)
|
||||
|
||||
ctx := context.Background()
|
||||
mockListener.EXPECT().LogProduced(ctx, "taskID", "just one line\n")
|
||||
|
||||
err := lc.Append(ctx, "just one line")
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = lc.Flush(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty")
|
||||
}
|
||||
|
||||
func TestLogChunkerMuchLogging(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockListener := mocks.NewMockCommandListener(mockCtrl)
|
||||
lc := NewLogChunker("taskID", mockListener)
|
||||
lc.bufferFlushSize = 12
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := lc.Append(ctx, "één regel") // 9 runes, 11 bytes, 12 with newline, within buffer size.
|
||||
assert.NoError(t, err)
|
||||
|
||||
mockListener.EXPECT().LogProduced(ctx, "taskID", "één regel\nsecond line\n")
|
||||
|
||||
err = lc.Append(ctx, "second line") // this pushes the buffer over its max size.
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 0, lc.buffer.Len(), "buffer should be empty")
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user