diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 81f0db3d..fba6ca71 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -76,6 +76,10 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) type ChangeBroadcaster interface { // BroadcastNewJob sends a 'new job' notification to all SocketIO clients. BroadcastNewJob(jobUpdate api.JobUpdate) + + // Note that there is no BroadcastNewTask. The 'new job' broadcast is sent + // after the job's tasks have been created, and thus there is no need for a + // separate broadcast per task. } // ChangeBroadcaster should be a subset of webupdates.BiDirComms. diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 7e930865..5b1ef49e 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -157,3 +157,15 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{} mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0) } + +// BroadcastTaskUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastTaskUpdate(arg0 api.SocketIOTaskUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastTaskUpdate", arg0) +} + +// BroadcastTaskUpdate indicates an expected call of BroadcastTaskUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskUpdate), arg0) +} diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 3e7e2163..bdf81285 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -48,8 +48,11 @@ type PersistenceService interface { var _ PersistenceService = (*persistence.DB)(nil) type ChangeBroadcaster interface { - // BroadcastJobUpdate sends the job update to clients. + // BroadcastJobUpdate sends the job update to SocketIO clients. BroadcastJobUpdate(jobUpdate api.JobUpdate) + + // BroadcastTaskUpdate sends the task update to SocketIO clients. + BroadcastTaskUpdate(jobUpdate api.SocketIOTaskUpdate) } // ChangeBroadcaster should be a subset of webupdates.BiDirComms @@ -89,6 +92,12 @@ func (sm *StateMachine) TaskStatusChange( if err := sm.persist.SaveTask(ctx, task); err != nil { return fmt.Errorf("saving task to database: %w", err) } + + // Broadcast this change to the SocketIO clients. + taskUpdate := webupdates.NewTaskUpdate(task) + taskUpdate.PreviousStatus = &oldTaskStatus + sm.broadcaster.BroadcastTaskUpdate(taskUpdate) + if err := sm.updateJobAfterTaskStatusChange(ctx, task, oldTaskStatus); err != nil { return fmt.Errorf("updating job after task status change: %w", err) } diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index 04df7620..c7dd5bd3 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -24,6 +24,24 @@ func NewJobUpdate(job *persistence.Job) api.JobUpdate { return jobUpdate } +// NewTaskUpdate returns a partial TaskUpdate struct for the given task. It only +// fills in the fields that represent the current state of the task. For +// example, it omits `PreviousStatus`. The omitted fields can be filled in by +// the caller. +// +// Assumes task.Job is not nil. +func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate { + taskUpdate := api.SocketIOTaskUpdate{ + Id: task.UUID, + JobId: task.Job.UUID, + Name: task.Name, + Updated: task.UpdatedAt, + Status: task.Status, + } + return taskUpdate + +} + // BroadcastJobUpdate sends the job update to clients. func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) { log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") @@ -31,6 +49,8 @@ func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) { } // BroadcastNewJob sends a "new job" notification to clients. +// This function should be called when the job has been completely created, so +// including its tasks. func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) { if jobUpdate.PreviousStatus != nil { log.Warn().Interface("jobUpdate", jobUpdate).Msg("socketIO: new jobs should not have a previous state") @@ -41,6 +61,13 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) { b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) } +// BroadcastTaskUpdate sends the task update to clients. +func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { + log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update") + room := roomForJob(taskUpdate.JobId) + b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate) +} + // roomForJob will return the SocketIO room name for the given job. Clients in // this room will receive info scoped to this job, so for example updates to all // tasks of this job. diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index c49f9abb..6ead2691 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -24,6 +24,7 @@ const ( SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate + SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) diff --git a/web/app/src/App.vue b/web/app/src/App.vue index 3ec26aa8..9b13e17f 100644 --- a/web/app/src/App.vue +++ b/web/app/src/App.vue @@ -17,8 +17,8 @@ @@ -61,7 +61,9 @@ export default { this.fetchManagerInfo(); }, methods: { - // UI component event handlers: + // onSelectedJobChanged is called whenever the selected job changes; this is + // both when another job is selected and when the selected job itself gets + // updated. onSelectedJobChanged(jobSummary) { if (!jobSummary) { // There is no selected job. this.jobs.deselectAllJobs(); @@ -108,8 +110,8 @@ export default { } else { console.warn("App: this.$refs.jobsTable is", this.$refs.jobsTable); } - const activeJob = this.jobs.activeJob; - if (activeJob && activeJob.id == jobUpdate.id) { + + if (this.jobs.activeJobID == jobUpdate.id) { this.onSelectedJobChanged(jobUpdate); } }, @@ -122,6 +124,20 @@ export default { // this.messages.push(`New job: ${jobUpdate.id} (${jobUpdate.status})`); this.$refs.jobsTable.processNewJob(jobUpdate); }, + + /** + * Event handler for SocketIO task updates. + * @param {API.SocketIOTaskUpdate} taskUpdate + */ + onSioTaskUpdate(taskUpdate) { + if (!this.$refs.tasksTable) { + console.warn("App: this.$refs.tasksTable is", this.$refs.tasksTable); + return; + } + + this.$refs.tasksTable.processTaskUpdate(taskUpdate); + }, + onChatMessage(message) { console.log("chat message received:", message); this.messages.push(`${message.text}`); diff --git a/web/app/src/components/TasksTable.vue b/web/app/src/components/TasksTable.vue index 7926192b..8295da67 100644 --- a/web/app/src/components/TasksTable.vue +++ b/web/app/src/components/TasksTable.vue @@ -109,10 +109,6 @@ export default { this.tabulator.updateData([taskUpdate]) .then(this.sortData); }, - processNewTask(taskUpdate) { - this.tabulator.addData([taskUpdate]) - .then(this.sortData); - }, // Selection handling. onRowSelected(selectedRow) { diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index 081ce7cc..4d81ca10 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -26,7 +26,10 @@ export default { } this.connectToWebsocket(); }, - beforeDestroy: function () { + unmounted() { + this.disconnectWebsocket(); + }, + beforeDestroy() { this.disconnectWebsocket(); }, watch: {