Manager: move broadcasting of task logs via SocketIO to task log service

To ensure all task logs also get broadcast via SocketIO, the responsibility
has moved from the `api_impl` to the `task_logs` package.
This commit is contained in:
Sybren A. Stüvel 2022-06-09 16:49:48 +02:00
parent 04dd479248
commit 819cad1d18
8 changed files with 144 additions and 59 deletions

View File

@ -159,7 +159,7 @@ func buildFlamencoAPI(
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers") log.Fatal().Err(err).Msg("error loading job compilers")
} }
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath) logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, webUpdater)
shamanServer := shaman.NewServer(configService.Get().Shaman, nil) shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco( flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService, compiler, persist, webUpdater, logStorage, configService,

View File

@ -72,7 +72,9 @@ type ChangeBroadcaster interface {
// after the job's tasks have been created, and thus there is no need for a // after the job's tasks have been created, and thus there is no need for a
// separate broadcast per task. // separate broadcast per task.
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) // Note that there is no call to BoardcastTaskLogUpdate. It's the
// responsibility of `LogStorage.Write` to broadcast the changes to SocketIO
// clients.
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate)

View File

@ -305,18 +305,6 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0)
} }
// BroadcastTaskLogUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0)
}
// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0)
}
// BroadcastWorkerUpdate mocks base method. // BroadcastWorkerUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -441,7 +441,8 @@ func (f *Flamenco) doTaskUpdate(
} }
if update.Log != nil { if update.Log != nil {
f.taskLogAppend(logger, dbTask, *update.Log) // Errors writing the log to disk are already logged by logStorage, and can be safely ignored here.
_ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
} }
// Any error updating the status is more important than an error updating the // Any error updating the status is more important than an error updating the
@ -465,25 +466,10 @@ func (f *Flamenco) workerPingedTask(
return nil return nil
} }
// taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO.
func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't ripple up. As such, `err` is not returned to
// the caller.
err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, logChunk)
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, logChunk)
f.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
}
// taskLogAppendTimestamped writes the given log text, prefixed with the current date & time, to the task's log. // taskLogAppendTimestamped writes the given log text, prefixed with the current date & time, to the task's log.
func (f *Flamenco) taskLogAppendTimestamped(logger zerolog.Logger, dbTask *persistence.Task, logText string) { func (f *Flamenco) taskLogAppendTimestamped(logger zerolog.Logger, dbTask *persistence.Task, logText string) {
now := f.clock.Now().Format(time.RFC3339) now := f.clock.Now().Format(time.RFC3339)
f.taskLogAppend(logger, dbTask, now+" "+logText) _ = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, now+" "+logText)
} }
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {

View File

@ -38,7 +38,6 @@ func TestTaskScheduleHappy(t *testing.T) {
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task.UUID, mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task.UUID,
"2022-06-09T11:14:41+02:00 Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)\n") "2022-06-09T11:14:41+02:00 Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)\n")
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()) // The task log should be updated; this test assumes the contents are ok.
err := mf.flamenco.ScheduleTask(echo) err := mf.flamenco.ScheduleTask(echo)
assert.NoError(t, err) assert.NoError(t, err)
@ -171,8 +170,6 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
logMsg := "2022-06-09T11:14:41+02:00 Task was requeued by Manager because the worker assigned to it signed off.\n" logMsg := "2022-06-09T11:14:41+02:00 Task was requeued by Manager because the worker assigned to it signed off.\n"
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg) mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg)
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg) mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg)
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task1.UUID, Log: logMsg})
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task2.UUID, Log: logMsg})
// Expect worker to be saved as 'offline'. // Expect worker to be saved as 'offline'.
mf.persistence.EXPECT(). mf.persistence.EXPECT().
@ -389,10 +386,6 @@ func TestTaskUpdate(t *testing.T) {
// Expect the log to be written and broadcast over SocketIO. // Expect the log to be written and broadcast over SocketIO.
mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n") mf.logStorage.EXPECT().Write(gomock.Any(), jobID, taskID, "line1\nline2\n")
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{
TaskId: taskID,
Log: "line1\nline2\n",
})
// Expect a 'touch' of the task. // Expect a 'touch' of the task.
var touchedTask persistence.Task var touchedTask persistence.Task

View File

@ -0,0 +1,47 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/task_logs (interfaces: ChangeBroadcaster)
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
api "git.blender.org/flamenco/pkg/api"
gomock "github.com/golang/mock/gomock"
)
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller
recorder *MockChangeBroadcasterMockRecorder
}
// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster.
type MockChangeBroadcasterMockRecorder struct {
mock *MockChangeBroadcaster
}
// NewMockChangeBroadcaster creates a new mock instance.
func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster {
mock := &MockChangeBroadcaster{ctrl: ctrl}
mock.recorder = &MockChangeBroadcasterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastTaskLogUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0)
}
// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0)
}

View File

@ -11,6 +11,8 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -24,13 +26,29 @@ const (
type Storage struct { type Storage struct {
BasePath string // Directory where task logs are stored. BasePath string // Directory where task logs are stored.
broadcaster ChangeBroadcaster
// Locks to only allow one goroutine at a time to handle the logs of a certain task. // Locks to only allow one goroutine at a time to handle the logs of a certain task.
mutex *sync.Mutex mutex *sync.Mutex
taskLocks map[string]*sync.Mutex taskLocks map[string]*sync.Mutex
} }
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/task_logs ChangeBroadcaster
type ChangeBroadcaster interface {
// BroadcastTaskLogUpdate sends the task log update to SocketIO clients.
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// NewStorage creates a new log storage rooted at `basePath`. // NewStorage creates a new log storage rooted at `basePath`.
func NewStorage(basePath string) *Storage { func NewStorage(
basePath string,
broadcaster ChangeBroadcaster,
) *Storage {
if !filepath.IsAbs(basePath) { if !filepath.IsAbs(basePath) {
absPath, err := filepath.Abs(basePath) absPath, err := filepath.Abs(basePath)
if err != nil { if err != nil {
@ -45,12 +63,32 @@ func NewStorage(basePath string) *Storage {
return &Storage{ return &Storage{
BasePath: basePath, BasePath: basePath,
broadcaster: broadcaster,
mutex: new(sync.Mutex), mutex: new(sync.Mutex),
taskLocks: make(map[string]*sync.Mutex), taskLocks: make(map[string]*sync.Mutex),
} }
} }
// Write appends text to a task's log file, and broadcasts the log lines via SocketIO.
func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText string) error { func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText string) error {
if err := s.writeToDisk(logger, jobID, taskID, logText); err != nil {
return err
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(taskID, logText)
s.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
return nil
}
// // Write appends text, prefixed with the current date & time, to a task's log file,
// // and broadcasts the log lines via SocketIO.
// func (s *Storage) WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error {
// now := s.clock.Now().Format(time.RFC3339)
// return s.Write(logger, jobID, taskID, now+" "+logText)
// }
func (s *Storage) writeToDisk(logger zerolog.Logger, jobID, taskID string, logText string) error {
// Shortcut to avoid creating an empty log file. It also solves an // Shortcut to avoid creating an empty log file. It also solves an
// index out of bounds error further down when we check the last character. // index out of bounds error further down when we check the last character.
if logText == "" { if logText == "" {

View File

@ -11,22 +11,20 @@ import (
"sync" "sync"
"testing" "testing"
"git.blender.org/flamenco/internal/manager/task_logs/mocks"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func tempStorage() *Storage {
temppath, err := ioutil.TempDir("", "testlogs")
if err != nil {
panic(err)
}
return NewStorage(temppath)
}
func TestLogWriting(t *testing.T) { func TestLogWriting(t *testing.T) {
s := tempStorage() s, finish, mocks := taskLogsTestFixtures(t)
defer os.RemoveAll(s.BasePath) defer finish()
// Expect broadcastst for each call to s.Write()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(2)
err := s.Write(zerolog.Nop(), err := s.Write(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
@ -52,8 +50,10 @@ func TestLogWriting(t *testing.T) {
} }
func TestLogRotation(t *testing.T) { func TestLogRotation(t *testing.T) {
s := tempStorage() s, finish, mocks := taskLogsTestFixtures(t)
defer os.RemoveAll(s.BasePath) defer finish()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any())
err := s.Write(zerolog.Nop(), err := s.Write(zerolog.Nop(),
"25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c", "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c",
@ -81,12 +81,15 @@ func TestLogRotation(t *testing.T) {
} }
func TestLogTail(t *testing.T) { func TestLogTail(t *testing.T) {
s := tempStorage() s, finish, mocks := taskLogsTestFixtures(t)
defer os.RemoveAll(s.BasePath) defer finish()
jobID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c" jobID := "25c5a51c-e0dd-44f7-9f87-74f3d1fbbd8c"
taskID := "20ff9d06-53ec-4019-9e2e-1774f05f170a" taskID := "20ff9d06-53ec-4019-9e2e-1774f05f170a"
// Expect broadcastst for each call to s.Write()
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(3)
contents, err := s.Tail(jobID, taskID) contents, err := s.Tail(jobID, taskID)
assert.ErrorIs(t, err, os.ErrNotExist) assert.ErrorIs(t, err, os.ErrNotExist)
assert.Equal(t, "", contents) assert.Equal(t, "", contents)
@ -142,9 +145,8 @@ func TestLogTail(t *testing.T) {
} }
func TestLogWritingParallel(t *testing.T) { func TestLogWritingParallel(t *testing.T) {
s := tempStorage() s, finish, mocks := taskLogsTestFixtures(t)
defer os.RemoveAll(s.BasePath) defer finish()
// defer t.Errorf("not removing %s", s.BasePath)
numGoroutines := 1000 // How many goroutines run in parallel. numGoroutines := 1000 // How many goroutines run in parallel.
runLength := 100 // How many characters are logged, per goroutine. runLength := 100 // How many characters are logged, per goroutine.
@ -154,6 +156,8 @@ func TestLogWritingParallel(t *testing.T) {
jobID := "6d9a05a1-261e-4f6f-93b0-8c4f6b6d500d" jobID := "6d9a05a1-261e-4f6f-93b0-8c4f6b6d500d"
taskID := "d19888cc-c389-4a24-aebf-8458ababdb02" taskID := "d19888cc-c389-4a24-aebf-8458ababdb02"
mocks.broadcaster.EXPECT().BroadcastTaskLogUpdate(gomock.Any()).Times(numGoroutines)
for i := 0; i < numGoroutines; i++ { for i := 0; i < numGoroutines; i++ {
// Write lines of 100 characters to the task log. Each goroutine writes a // Write lines of 100 characters to the task log. Each goroutine writes a
// different character, starting at 'A'. // different character, starting at 'A'.
@ -188,3 +192,30 @@ func TestLogWritingParallel(t *testing.T) {
} }
} }
} }
type TaskLogsMocks struct {
clock *clock.Mock
broadcaster *mocks.MockChangeBroadcaster
}
func taskLogsTestFixtures(t *testing.T) (*Storage, func(), *TaskLogsMocks) {
mockCtrl := gomock.NewController(t)
mocks := &TaskLogsMocks{
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
}
temppath, err := ioutil.TempDir("", "testlogs")
if err != nil {
panic(err)
}
// This should be called at the end of each unit test.
finish := func() {
os.RemoveAll(temppath)
mockCtrl.Finish()
}
sm := NewStorage(temppath, mocks.broadcaster)
return sm, finish, mocks
}