From a715b3bfbeb8e4bc6aaf12b3c6aa3c2adb490caf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 5 Apr 2022 16:19:33 +0200 Subject: [PATCH] Manager: connect SocketIO broadcaster with job creation --- cmd/flamenco-manager/main.go | 2 +- internal/manager/api_impl/api_impl.go | 5 +- internal/manager/api_impl/jobs.go | 7 +++ internal/manager/api_impl/jobs_test.go | 59 +++++++++++++++++++ .../api_impl/mocks/api_impl_mock.gen.go | 37 +++++++++++- internal/manager/api_impl/support_test.go | 5 +- internal/manager/webupdates/chatrooms.go | 4 ++ internal/manager/webupdates/job_updates.go | 4 +- internal/manager/webupdates/webupdates.go | 32 +++------- 9 files changed, 126 insertions(+), 29 deletions(-) diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 5baa3aec..44efd686 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -141,7 +141,7 @@ func buildFlamencoAPI(configService *config.Service, persist *persistence.DB, we logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) shamanServer := shaman.NewServer(configService.Get().Shaman, nil) - flamenco := api_impl.NewFlamenco(compiler, persist, logStorage, configService, taskStateMachine, shamanServer) + flamenco := api_impl.NewFlamenco(compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer) return flamenco } diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index c36c7b1b..5a0604b5 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -21,6 +21,7 @@ import ( type Flamenco struct { jobCompiler JobCompiler persist PersistenceService + broadcaster ChangeBroadcaster logStorage LogStorage config ConfigService stateMachine TaskStateMachine @@ -30,7 +31,7 @@ type Flamenco struct { var _ api.ServerInterface = (*Flamenco)(nil) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -123,6 +124,7 @@ var _ Shaman = (*shaman.Server)(nil) func NewFlamenco( jc JobCompiler, jps PersistenceService, + b ChangeBroadcaster, ls LogStorage, cs ConfigService, sm TaskStateMachine, @@ -131,6 +133,7 @@ func NewFlamenco( return &Flamenco{ jobCompiler: jc, persist: jps, + broadcaster: b, logStorage: ls, config: cs, stateMachine: sm, diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index e4a4c66e..62354412 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -67,6 +67,13 @@ func (f *Flamenco) SubmitJob(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database") } + jobUpdate := api.JobUpdate{ + Id: dbJob.UUID, + Status: dbJob.Status, + Updated: dbJob.UpdatedAt, + } + f.broadcaster.BroadcastNewJob(jobUpdate) + apiJob := jobDBtoAPI(dbJob) return e.JSON(http.StatusOK, apiJob) } diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 40c767b5..1d058809 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -5,7 +5,9 @@ package api_impl import ( "context" "testing" + "time" + "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" "github.com/golang/mock/gomock" @@ -16,6 +18,63 @@ func ptr[T any](value T) *T { return &value } +func TestSubmitJob(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + submittedJob := api.SubmittedJob{ + Name: "поднео посао", + Type: "test", + Priority: 50, + } + + // Expect the job compiler to be called. + authoredJob := job_compilers.AuthoredJob{ + JobID: "afc47568-bd9d-4368-8016-e91d945db36d", + Name: submittedJob.Name, + JobType: submittedJob.Type, + Priority: submittedJob.Priority, + Status: api.JobStatusUnderConstruction, + Created: time.Now(), + } + mf.jobCompiler.EXPECT().Compile(gomock.Any(), submittedJob).Return(&authoredJob, nil) + + // Expect the job to be saved with 'queued' status: + queuedJob := authoredJob + queuedJob.Status = api.JobStatusQueued + mf.persistence.EXPECT().StoreAuthoredJob(gomock.Any(), queuedJob).Return(nil) + + // Expect the job to be fetched from the database again: + dbJob := persistence.Job{ + UUID: queuedJob.JobID, + Name: queuedJob.Name, + JobType: queuedJob.JobType, + Priority: queuedJob.Priority, + Status: queuedJob.Status, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil) + + // Expect the new job to be broadcast. + jobUpdate := api.JobUpdate{ + Id: dbJob.UUID, + Updated: dbJob.UpdatedAt, + Status: dbJob.Status, + } + mf.broadcaster.EXPECT().BroadcastNewJob(jobUpdate) + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(submittedJob) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.SubmitJob(echoCtx) + assert.NoError(t, err) + +} + func TestTaskUpdate(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index b3ce3e43..820306e1 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman) +// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman) // Package mocks is a generated GoMock package. package mocks @@ -214,6 +214,41 @@ func (mr *MockPersistenceServiceMockRecorder) StoreAuthoredJob(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreAuthoredJob", reflect.TypeOf((*MockPersistenceService)(nil).StoreAuthoredJob), arg0, arg1) } +// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. +type MockChangeBroadcaster struct { + ctrl *gomock.Controller + recorder *MockChangeBroadcasterMockRecorder +} + +// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster. +type MockChangeBroadcasterMockRecorder struct { + mock *MockChangeBroadcaster +} + +// NewMockChangeBroadcaster creates a new mock instance. +func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster { + mock := &MockChangeBroadcaster{ctrl: ctrl} + mock.recorder = &MockChangeBroadcasterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { + return m.recorder +} + +// BroadcastNewJob mocks base method. +func (m *MockChangeBroadcaster) BroadcastNewJob(arg0 api.JobUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastNewJob", arg0) +} + +// BroadcastNewJob indicates an expected call of BroadcastNewJob. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0) +} + // MockJobCompiler is a mock of JobCompiler interface. type MockJobCompiler struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index d2b205de..93c96877 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -22,6 +22,7 @@ type mockedFlamenco struct { flamenco *Flamenco jobCompiler *mocks.MockJobCompiler persistence *mocks.MockPersistenceService + broadcaster *mocks.MockChangeBroadcaster logStorage *mocks.MockLogStorage config *mocks.MockConfigService stateMachine *mocks.MockTaskStateMachine @@ -31,16 +32,18 @@ type mockedFlamenco struct { func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { jc := mocks.NewMockJobCompiler(mockCtrl) ps := mocks.NewMockPersistenceService(mockCtrl) + cb := mocks.NewMockChangeBroadcaster(mockCtrl) ls := mocks.NewMockLogStorage(mockCtrl) cs := mocks.NewMockConfigService(mockCtrl) sm := mocks.NewMockTaskStateMachine(mockCtrl) sha := mocks.NewMockShaman(mockCtrl) - f := NewFlamenco(jc, ps, ls, cs, sm, sha) + f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha) return mockedFlamenco{ flamenco: f, jobCompiler: jc, persistence: ps, + broadcaster: cb, logStorage: ls, config: cs, stateMachine: sm, diff --git a/internal/manager/webupdates/chatrooms.go b/internal/manager/webupdates/chatrooms.go index 96dec1a5..57806ba8 100644 --- a/internal/manager/webupdates/chatrooms.go +++ b/internal/manager/webupdates/chatrooms.go @@ -17,3 +17,7 @@ const ( SIOEventChatMessageSend SocketIOEventType = "/message" // messages are broadcasted here SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate ) + +func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) { + b.sockserv.BroadcastTo(string(room), string(eventType), payload) +} diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index 297c6488..89aab145 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -10,7 +10,7 @@ import ( // BroadcastJobUpdate sends the job update to clients. func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) { log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") - b.sockserv.BroadcastTo(string(SocketIORoomJobs), "/jobs", jobUpdate) + b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) } // BroadcastNewJob sends a "new job" notification to clients. @@ -21,5 +21,5 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) { } log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job") - b.sockserv.BroadcastTo(string(SocketIORoomJobs), "/jobs", jobUpdate) + b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) } diff --git a/internal/manager/webupdates/webupdates.go b/internal/manager/webupdates/webupdates.go index 760f8962..e18d0a00 100644 --- a/internal/manager/webupdates/webupdates.go +++ b/internal/manager/webupdates/webupdates.go @@ -32,39 +32,28 @@ func socketIOServer() *gosocketio.Server { sio := gosocketio.NewServer(transport.GetDefaultWebsocketTransport()) log.Info().Msg("initialising SocketIO") - var err error + // the sio.On() and c.Join() calls only return an error when there is no + // server connected to them, but that's not possible with our setup. + // Errors are explicitly silenced (by assigning to _) to reduce clutter. // socket connection - err = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { + _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { log.Debug().Str("clientID", c.Id()).Msg("socketIO: connected") - if err := c.Join(string(SocketIORoomChat)); err != nil { - log.Warn().Err(err).Str("clientID", c.Id()).Msg("socketIO: unable to make client join broadcast message room") - } + _ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room. + _ = c.Join(string(SocketIORoomJobs)) // All clients subscribe to job updates. }) - if err != nil { - log.Error().Err(err).Msg("socketIO: unable to register OnConnection handler") - } // socket disconnection - err = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { + _ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { log.Debug().Str("clientID", c.Id()).Msg("socketIO: disconnected") - if err := c.Leave(string(SocketIORoomChat)); err != nil { - log.Warn().Err(err).Str("clientID", c.Id()).Msg("socketIO: unable to make client leave broadcast message room") - } }) - if err != nil { - log.Error().Err(err).Msg("socketIO: unable to register OnDisconnection handler") - } - err = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { + _ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { log.Warn().Interface("c", c).Msg("socketIO: socketio error") }) - if err != nil { - log.Error().Err(err).Msg("socketIO: unable to register OnError handler") - } // chat socket - err = sio.On(string(SIOEventChatMessageRcv), func(c *gosocketio.Channel, message Message) string { + _ = sio.On(string(SIOEventChatMessageRcv), func(c *gosocketio.Channel, message Message) string { log.Info().Str("clientID", c.Id()). Str("text", message.Text). Str("name", message.Name). @@ -72,9 +61,6 @@ func socketIOServer() *gosocketio.Server { c.BroadcastTo(string(SocketIORoomChat), string(SIOEventChatMessageSend), message) return "message sent successfully." }) - if err != nil { - log.Error().Err(err).Msg("socketIO: unable to register /chat handler") - } return sio }