Manager: connect SocketIO broadcaster with job creation

This commit is contained in:
Sybren A. Stüvel 2022-04-05 16:19:33 +02:00
parent 0c0df41f5d
commit a715b3bfbe
9 changed files with 126 additions and 29 deletions

View File

@ -141,7 +141,7 @@ func buildFlamencoAPI(configService *config.Service, persist *persistence.DB, we
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater)
shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService, taskStateMachine, shamanServer)
flamenco := api_impl.NewFlamenco(compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer)
return flamenco
}

View File

@ -21,6 +21,7 @@ import (
type Flamenco struct {
jobCompiler JobCompiler
persist PersistenceService
broadcaster ChangeBroadcaster
logStorage LogStorage
config ConfigService
stateMachine TaskStateMachine
@ -30,7 +31,7 @@ type Flamenco struct {
var _ api.ServerInterface = (*Flamenco)(nil)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman
type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -123,6 +124,7 @@ var _ Shaman = (*shaman.Server)(nil)
func NewFlamenco(
jc JobCompiler,
jps PersistenceService,
b ChangeBroadcaster,
ls LogStorage,
cs ConfigService,
sm TaskStateMachine,
@ -131,6 +133,7 @@ func NewFlamenco(
return &Flamenco{
jobCompiler: jc,
persist: jps,
broadcaster: b,
logStorage: ls,
config: cs,
stateMachine: sm,

View File

@ -67,6 +67,13 @@ func (f *Flamenco) SubmitJob(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database")
}
jobUpdate := api.JobUpdate{
Id: dbJob.UUID,
Status: dbJob.Status,
Updated: dbJob.UpdatedAt,
}
f.broadcaster.BroadcastNewJob(jobUpdate)
apiJob := jobDBtoAPI(dbJob)
return e.JSON(http.StatusOK, apiJob)
}

View File

@ -5,7 +5,9 @@ package api_impl
import (
"context"
"testing"
"time"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
"github.com/golang/mock/gomock"
@ -16,6 +18,63 @@ func ptr[T any](value T) *T {
return &value
}
func TestSubmitJob(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
submittedJob := api.SubmittedJob{
Name: "поднео посао",
Type: "test",
Priority: 50,
}
// Expect the job compiler to be called.
authoredJob := job_compilers.AuthoredJob{
JobID: "afc47568-bd9d-4368-8016-e91d945db36d",
Name: submittedJob.Name,
JobType: submittedJob.Type,
Priority: submittedJob.Priority,
Status: api.JobStatusUnderConstruction,
Created: time.Now(),
}
mf.jobCompiler.EXPECT().Compile(gomock.Any(), submittedJob).Return(&authoredJob, nil)
// Expect the job to be saved with 'queued' status:
queuedJob := authoredJob
queuedJob.Status = api.JobStatusQueued
mf.persistence.EXPECT().StoreAuthoredJob(gomock.Any(), queuedJob).Return(nil)
// Expect the job to be fetched from the database again:
dbJob := persistence.Job{
UUID: queuedJob.JobID,
Name: queuedJob.Name,
JobType: queuedJob.JobType,
Priority: queuedJob.Priority,
Status: queuedJob.Status,
Settings: persistence.StringInterfaceMap{},
Metadata: persistence.StringStringMap{},
}
mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil)
// Expect the new job to be broadcast.
jobUpdate := api.JobUpdate{
Id: dbJob.UUID,
Updated: dbJob.UpdatedAt,
Status: dbJob.Status,
}
mf.broadcaster.EXPECT().BroadcastNewJob(jobUpdate)
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(submittedJob)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.SubmitJob(echoCtx)
assert.NoError(t, err)
}
func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman)
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman)
// Package mocks is a generated GoMock package.
package mocks
@ -214,6 +214,41 @@ func (mr *MockPersistenceServiceMockRecorder) StoreAuthoredJob(arg0, arg1 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreAuthoredJob", reflect.TypeOf((*MockPersistenceService)(nil).StoreAuthoredJob), arg0, arg1)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller
recorder *MockChangeBroadcasterMockRecorder
}
// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster.
type MockChangeBroadcasterMockRecorder struct {
mock *MockChangeBroadcaster
}
// NewMockChangeBroadcaster creates a new mock instance.
func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster {
mock := &MockChangeBroadcaster{ctrl: ctrl}
mock.recorder = &MockChangeBroadcasterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastNewJob mocks base method.
func (m *MockChangeBroadcaster) BroadcastNewJob(arg0 api.JobUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastNewJob", arg0)
}
// BroadcastNewJob indicates an expected call of BroadcastNewJob.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0)
}
// MockJobCompiler is a mock of JobCompiler interface.
type MockJobCompiler struct {
ctrl *gomock.Controller

View File

@ -22,6 +22,7 @@ type mockedFlamenco struct {
flamenco *Flamenco
jobCompiler *mocks.MockJobCompiler
persistence *mocks.MockPersistenceService
broadcaster *mocks.MockChangeBroadcaster
logStorage *mocks.MockLogStorage
config *mocks.MockConfigService
stateMachine *mocks.MockTaskStateMachine
@ -31,16 +32,18 @@ type mockedFlamenco struct {
func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
jc := mocks.NewMockJobCompiler(mockCtrl)
ps := mocks.NewMockPersistenceService(mockCtrl)
cb := mocks.NewMockChangeBroadcaster(mockCtrl)
ls := mocks.NewMockLogStorage(mockCtrl)
cs := mocks.NewMockConfigService(mockCtrl)
sm := mocks.NewMockTaskStateMachine(mockCtrl)
sha := mocks.NewMockShaman(mockCtrl)
f := NewFlamenco(jc, ps, ls, cs, sm, sha)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha)
return mockedFlamenco{
flamenco: f,
jobCompiler: jc,
persistence: ps,
broadcaster: cb,
logStorage: ls,
config: cs,
stateMachine: sm,

View File

@ -17,3 +17,7 @@ const (
SIOEventChatMessageSend SocketIOEventType = "/message" // messages are broadcasted here
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate
)
func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) {
b.sockserv.BroadcastTo(string(room), string(eventType), payload)
}

View File

@ -10,7 +10,7 @@ import (
// BroadcastJobUpdate sends the job update to clients.
func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) {
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update")
b.sockserv.BroadcastTo(string(SocketIORoomJobs), "/jobs", jobUpdate)
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
}
// BroadcastNewJob sends a "new job" notification to clients.
@ -21,5 +21,5 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) {
}
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job")
b.sockserv.BroadcastTo(string(SocketIORoomJobs), "/jobs", jobUpdate)
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
}

View File

@ -32,39 +32,28 @@ func socketIOServer() *gosocketio.Server {
sio := gosocketio.NewServer(transport.GetDefaultWebsocketTransport())
log.Info().Msg("initialising SocketIO")
var err error
// the sio.On() and c.Join() calls only return an error when there is no
// server connected to them, but that's not possible with our setup.
// Errors are explicitly silenced (by assigning to _) to reduce clutter.
// socket connection
err = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
_ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
log.Debug().Str("clientID", c.Id()).Msg("socketIO: connected")
if err := c.Join(string(SocketIORoomChat)); err != nil {
log.Warn().Err(err).Str("clientID", c.Id()).Msg("socketIO: unable to make client join broadcast message room")
}
_ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room.
_ = c.Join(string(SocketIORoomJobs)) // All clients subscribe to job updates.
})
if err != nil {
log.Error().Err(err).Msg("socketIO: unable to register OnConnection handler")
}
// socket disconnection
err = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) {
_ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) {
log.Debug().Str("clientID", c.Id()).Msg("socketIO: disconnected")
if err := c.Leave(string(SocketIORoomChat)); err != nil {
log.Warn().Err(err).Str("clientID", c.Id()).Msg("socketIO: unable to make client leave broadcast message room")
}
})
if err != nil {
log.Error().Err(err).Msg("socketIO: unable to register OnDisconnection handler")
}
err = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) {
_ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) {
log.Warn().Interface("c", c).Msg("socketIO: socketio error")
})
if err != nil {
log.Error().Err(err).Msg("socketIO: unable to register OnError handler")
}
// chat socket
err = sio.On(string(SIOEventChatMessageRcv), func(c *gosocketio.Channel, message Message) string {
_ = sio.On(string(SIOEventChatMessageRcv), func(c *gosocketio.Channel, message Message) string {
log.Info().Str("clientID", c.Id()).
Str("text", message.Text).
Str("name", message.Name).
@ -72,9 +61,6 @@ func socketIOServer() *gosocketio.Server {
c.BroadcastTo(string(SocketIORoomChat), string(SIOEventChatMessageSend), message)
return "message sent successfully."
})
if err != nil {
log.Error().Err(err).Msg("socketIO: unable to register /chat handler")
}
return sio
}